Sdílet prostřednictvím


Orchestrace pracovních postupů architektury Microsoft Agent – sekvenční

V sekvenční orchestraci jsou agenti uspořádáni do řetězce. Každý agent zpracovává úlohu postupně a předává výstup dalšímu agentu v pořadí. To je ideální pro pracovní postupy, ve kterých každý krok vychází z předchozího kroku, jako je kontrola dokumentů, kanály zpracování dat nebo vícefázové odůvodnění.

Sekvenční orchestrace

Důležité

Úplná historie konverzací z předchozích agentů se předává dalšímu agentovi v sekvenci. Každý agent může zobrazit všechny předchozí zprávy, což umožňuje zpracování s podporou kontextu.

Co se naučíte

  • Jak vytvořit sekvenční potrubí agentů
  • Jak propojit agenty, kde každý navazuje na předchozí výstup
  • Jak přidat schválení s lidským zapojením pro citlivá volání nástrojů
  • Jak kombinovat agenty s vlastními exekutory pro specializované úlohy
  • Jak sledovat tok konverzace prostřednictvím pipeline

Definování agentů

V sekvenční orchestraci jsou agenti uspořádáni do kanálu, kde každý agent zpracovává úlohu postupně a předává výstup dalšímu agentu v pořadí.

Nastavení klienta Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetChatClient(deploymentName)
    .AsIChatClient();

Výstraha

DefaultAzureCredential je vhodný pro vývoj, ale vyžaduje pečlivé zvážení v produkčním prostředí. V produkčním prostředí zvažte použití konkrétních přihlašovacích údajů (např ManagedIdentityCredential. ) k zabránění problémům s latencí, neúmyslnému testování přihlašovacích údajů a potenciálním bezpečnostním rizikům z náhradních mechanismů.

Vytvořte specializované agenty, které budou fungovat postupně:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Nastavení sekvenční orchestrace

Sestavení pracovního postupu pomocí AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Spuštění sekvenčního pracovního postupu

Spusťte pracovní postup a zpracujte události:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Content}");
}

Ukázkový výstup

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Sekvenční orchestrace se zapojením člověka

Postupné orchestrace podporují interakce řízené lidmi prostřednictvím schvalování nástrojů. Když agenti používají nástroje zabalené ApprovalRequiredAIFunction, pracovní postup se pozastaví a vygeneruje RequestInfoEvent obsahující ToolApprovalRequestContent. Externí systémy (například lidské operátory) můžou zkontrolovat volání nástroje, schválit nebo odmítnout a pracovní postup se odpovídajícím způsobem obnoví.

Sekvenční orchestrace s lidskou kontrolou

Návod

Další podrobnosti o modelu požadavků a odpovědí najdete v tématu Human-in-the-Loop.

Definování agentů pomocí nástrojů vyžadujících schválení

Vytvořte agenty, ve kterých jsou citlivé nástroje zabalené:ApprovalRequiredAIFunction

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Sestavení a spuštění pomocí zpracování schválení

Normálně sestavte sekvenční pracovní postup. Tok schválení se zpracovává prostřednictvím streamu událostí:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Poznámka:

AgentWorkflowBuilder.BuildSequential() podporuje schválení nástroje bez nutnosti další konfigurace. Když agent volá nástroj zabalený ApprovalRequiredAIFunction, pracovní postup se automaticky pozastaví a vygeneruje RequestInfoEvent.

Návod

Kompletní spustitelný příklad procesu schvalování najdete v ukázceGroupChatToolApproval. Stejný RequestInfoEvent vzor zpracování platí i pro jiné orchestrace.

Klíčové koncepty

  • Sekvenční zpracování: Každý agent zpracovává výstup předchozího agenta v pořadí.
  • AgentWorkflowBuilder.BuildSequential(): Vytvoří sekvenční pracovní postup z kolekce agentů.
  • ChatClientAgent: Představuje agenta zálohovaného chatovacím klientem s konkrétními pokyny.
  • InProcessExecution.RunStreamingAsync(): Spustí pracovní postup a pro streamování událostí v reálném čase vrátí StreamingRun.
  • Zpracování událostí: Monitorování průběhu agenta prostřednictvím AgentResponseUpdateEvent a dokončení prostřednictvím WorkflowOutputEvent
  • Schválení nástroje: Zabalte citlivé nástroje tak, aby před provedením ApprovalRequiredAIFunction vyžadovaly lidské schválení.
  • RequestInfoEvent: Vygenerováno, když nástroj vyžaduje schválení; obsahuje ToolApprovalRequestContent podrobnosti o volání nástroje.

V sekvenční orchestraci každý agent zpracovává úlohu postupně, přičemž výstup plynule přechází z jednoho agenta na druhého. Začněte definováním agentů pro dvoufázový proces:

import os
from agent_framework.azure import AzureOpenAIResponsesClient
from azure.identity import AzureCliCredential

# 1) Create agents using AzureOpenAIResponsesClient
chat_client = AzureOpenAIResponsesClient(
    project_endpoint=os.environ["AZURE_AI_PROJECT_ENDPOINT"],
    deployment_name=os.environ["AZURE_AI_MODEL_DEPLOYMENT_NAME"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Nastavení sekvenční orchestrace

Třída SequentialBuilder vytvoří kanál, ve kterém agenti zpracovávají úlohy v pořadí. Každý agent vidí celou historii konverzací a přidá odpověď:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Spuštění sekvenčního pracovního postupu

Spusťte pracovní postup a shromážděte konečnou konverzaci zobrazující příspěvek jednotlivých agentů:

from typing import Any, cast
from agent_framework import Message, WorkflowEvent

# 3) Run and print final conversation
outputs: list[list[Message]] = []
async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output":
        outputs.append(cast(list[Message], event.data))

if outputs:
    print("===== Final Conversation =====")
    messages: list[Message] = outputs[-1]
    for i, msg in enumerate(messages, start=1):
        name = msg.author_name or ("assistant" if msg.role == "assistant" else "user")
        print(f"{'-' * 60}\n{i:02d} [{name}]\n{msg.text}")

Ukázkový výstup

===== Final Conversation =====
------------------------------------------------------------
01 [user]
Write a tagline for a budget-friendly eBike.
------------------------------------------------------------
02 [writer]
Ride farther, spend less—your affordable eBike adventure starts here.
------------------------------------------------------------
03 [reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Pokročilé: Kombinování agentů s vlastními exekutory

Sekvenční orchestrace podporuje kombinování agentů s vlastními exekutory pro specializované zpracování. To je užitečné, když potřebujete vlastní logiku, která nevyžaduje LLM:

Definování vlastního exekutoru

Poznámka:

Když vlastní exekutor následuje agenta v sekvenci, jeho obslužná rutina obdrží AgentExecutorResponse (protože agenti jsou interně zabalení AgentExecutor). Umožňuje agent_response.full_conversation přístup k celé historii konverzací.

from agent_framework import AgentExecutorResponse, Executor, WorkflowContext, handler
from agent_framework import Message

class Summarizer(Executor):
    """Simple summarizer: consumes full conversation and appends an assistant summary."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[list[Message]]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.send_message([Message("assistant", ["No conversation to summarize."])])
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.send_message(list(agent_response.full_conversation) + [summary])

Sestavení smíšeného sekvenčního pracovního postupu

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Ukázkový výstup s využitím vlastního exekutoru

------------------------------------------------------------
01 [user]
Explain the benefits of budget eBikes for commuters.
------------------------------------------------------------
02 [content]
Budget eBikes offer commuters an affordable, eco-friendly alternative to cars and public transport.
Their electric assistance reduces physical strain and allows riders to cover longer distances quickly,
minimizing travel time and fatigue. Budget models are low-cost to maintain and operate, making them accessible
for a wider range of people. Additionally, eBikes help reduce traffic congestion and carbon emissions,
supporting greener urban environments. Overall, budget eBikes provide cost-effective, efficient, and
sustainable transportation for daily commuting needs.
------------------------------------------------------------
03 [assistant]
Summary -> users:1 assistants:1

Sekvenční orchestrace pomocí smyčky human-in-the-loop

Sekvenční orchestrace podporují interakce s člověkem v procesu dvěma způsoby: schválení nástrojů pro řízení volání citlivých nástrojů a požadavek na informace pro pozastavení po každé odpovědi agenta za účelem shromáždění zpětné vazby.

Sekvenční orchestrace s člověkem v cyklu

Návod

Další podrobnosti o modelu požadavků a odpovědí najdete v tématu Human-in-the-Loop.

Schválení nástrojů v sekvenčních pracovních postupech

Slouží @tool(approval_mode="always_require") k označení nástrojů, které před provedením potřebují schválení člověkem. Pracovní postup se pozastaví a vygeneruje request_info událost, když se agent pokusí volat nástroj.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Zpracování streamu událostí a zpracování žádostí o schválení:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Návod

Úplný spustitelný příklad najdete v tématu sequential_builder_tool_approval.py. Schválení nástroje funguje bez SequentialBuilder jakékoli další konfigurace tvůrce.

Žádost o informace o zpětné vazbě agenta

Použijte .with_request_info() k pozastavení po odpovědi konkrétních agentů, což umožňuje externí vstup (například lidskou kontrolu) před tím, než začne další agent:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Klíčové koncepty

  • Sdílený kontext: Každý účastník obdrží celou historii konverzací včetně všech předchozích zpráv.
  • Order Matters: Agenti provádějí úkoly výhradně v pořadí uvedeném v participants seznamu.
  • Flexibilní účastníci: Můžete kombinovat agenty a exekutory v libovolném pořadí.
  • Průběh konverzace: Každý agent nebo exekutor se připojuje k toku konverzace a tvoří kompletní dialog.
  • Schválení nástroje: Slouží @tool(approval_mode="always_require") k citlivým operacím, které potřebují kontrolu člověka.
  • Informace o žádosti: Slouží .with_request_info(agents=[...]) k pozastavení po konkrétních agentech pro externí zpětnou vazbu.

Další kroky