Orchestrace pracovních postupů architektury Microsoft Agent – sekvenční

V sekvenční orchestraci jsou agenti uspořádáni do řetězce. Každý agent zpracovává úlohu postupně a předává výstup dalšímu agentu v pořadí. To je ideální pro pracovní postupy, ve kterých každý krok vychází z předchozího kroku, jako je kontrola dokumentů, kanály zpracování dat nebo vícefázové odůvodnění.

Sekvenční orchestrace

Důležité

Ve výchozím nastavení každý agent v sekvenci využívá úplnou konverzaci předchozího agenta – jak vstupní zprávy zadané předchozímu agentovi, tak i zprávy s odpovědí. Agenty můžete nakonfigurovat tak, aby místo toho spotřebovávají pouze zprávy odpovědí předchozího agenta. Podrobnosti najdete v tématu Řízení kontextu mezi agenty .

Co se naučíte

  • Jak vytvořit sekvenční potrubí agentů
  • Jak propojit agenty, kde každý navazuje na předchozí výstup
  • Jak přidat schválení s lidským zapojením pro citlivá volání nástrojů
  • Jak kombinovat agenty s vlastními exekutory pro specializované úlohy
  • Jak sledovat tok konverzace prostřednictvím pipeline

Definování agentů

V sekvenční orchestraci jsou agenti uspořádáni do kanálu, kde každý agent zpracovává úlohu postupně a předává výstup dalšímu agentu v pořadí.

Nastavení klienta Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Výstraha

DefaultAzureCredential je vhodný pro vývoj, ale vyžaduje pečlivé zvážení v produkčním prostředí. V produkčním prostředí zvažte použití konkrétních přihlašovacích údajů (např ManagedIdentityCredential. ) k zabránění problémům s latencí, neúmyslnému testování přihlašovacích údajů a potenciálním bezpečnostním rizikům z náhradních mechanismů.

Vytvořte specializované agenty, které budou fungovat postupně:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Nastavení sekvenční orchestrace

Sestavení pracovního postupu pomocí AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Spuštění sekvenčního pracovního postupu

Spusťte pracovní postup a zpracujte události:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Ukázkový výstup

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Sekvenční orchestrace se zapojením člověka

Postupné orchestrace podporují interakce řízené lidmi prostřednictvím schvalování nástrojů. Když agenti používají nástroje zabalené ApprovalRequiredAIFunction, pracovní postup se pozastaví a vygeneruje RequestInfoEvent obsahující ToolApprovalRequestContent. Externí systémy (například lidské operátory) můžou zkontrolovat volání nástroje, schválit nebo odmítnout a pracovní postup se odpovídajícím způsobem obnoví.

Sekvenční orchestrace s lidskou kontrolou

Návod

Další podrobnosti o modelu požadavků a odpovědí najdete v tématu Human-in-the-Loop.

Definování agentů pomocí nástrojů vyžadujících schválení

Vytvořte agenty, ve kterých jsou citlivé nástroje zabalené:ApprovalRequiredAIFunction

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Sestavení a spuštění pomocí zpracování schválení

Normálně sestavte sekvenční pracovní postup. Tok schválení se zpracovává prostřednictvím streamu událostí:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Poznámka:

AgentWorkflowBuilder.BuildSequential() podporuje schválení nástroje bez nutnosti další konfigurace. Když agent volá nástroj zabalený ApprovalRequiredAIFunction, pracovní postup se automaticky pozastaví a vygeneruje RequestInfoEvent.

Návod

Kompletní spustitelný příklad procesu schvalování najdete v ukázceGroupChatToolApproval. Stejný RequestInfoEvent vzor zpracování platí i pro jiné orchestrace.

Klíčové koncepty

  • Sekvenční zpracování: Každý agent zpracovává výstup předchozího agenta v pořadí.
  • AgentWorkflowBuilder.BuildSequential(): Vytvoří sekvenční pracovní postup z kolekce agentů.
  • ChatClientAgent: Představuje agenta zálohovaného chatovacím klientem s konkrétními pokyny.
  • InProcessExecution.RunStreamingAsync(): Spustí pracovní postup a pro streamování událostí v reálném čase vrátí StreamingRun.
  • Zpracování událostí: Monitorování průběhu agenta prostřednictvím AgentResponseUpdateEvent a dokončení prostřednictvím WorkflowOutputEvent
  • Schválení nástroje: Zabalte citlivé nástroje tak, aby před provedením ApprovalRequiredAIFunction vyžadovaly lidské schválení.
  • RequestInfoEvent: Vygenerováno, když nástroj vyžaduje schválení; obsahuje ToolApprovalRequestContent podrobnosti o volání nástroje.

V sekvenční orchestraci každý agent zpracovává úlohu postupně, přičemž výstup plynule přechází z jednoho agenta na druhého. Začněte definováním agentů pro dvoufázový proces:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Nastavení sekvenční orchestrace

Třída SequentialBuilder vytvoří kanál, ve kterém agenti zpracovávají úlohy v pořadí. Každý agent vidí celou historii konverzací a přidá odpověď:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Spuštění sekvenčního pracovního postupu

Spusťte pracovní postup a shromážděte konečný výstup. Výstup terminálu je AgentResponse obsahující zprávy odpovědí posledního agenta:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Ukázkový výstup

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Pokročilé: Kombinování agentů s vlastními exekutory

Sekvenční orchestrace podporuje kombinování agentů s vlastními exekutory pro specializované zpracování. To je užitečné, když potřebujete vlastní logiku, která nevyžaduje LLM:

Definování vlastního exekutoru

Poznámka:

Když vlastní exekutor následuje agenta v sekvenci, jeho obslužná rutina obdrží AgentExecutorResponse (protože agenti jsou interně zabalení AgentExecutor). Umožňuje agent_response.full_conversation přístup k celé historii konverzací. Vlastní exekutor, který se používá jako poslední účastník (ukončovací), musí volat ctx.yield_output(AgentResponse(...)), aby se jeho výstup stal konečným výstupem pracovního postupu.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Sestavení smíšeného sekvenčního pracovního postupu

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Ukázkový výstup s využitím vlastního exekutoru

===== Final Summary =====
Summary -> users:1 assistants:1

Ovládání kontextu mezi agenty

Ve výchozím nastavení každý agent v SequentialBuilder pracovním postupu využívá úplnou konverzaci předchozího agenta (vstup a zprávy odpovědí). Nastavení chain_only_agent_responses=True konfiguruje všechny agenty v sekvenci tak, aby místo toho spotřebovávají pouze zprávy odpovědí předchozího agenta:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

To je užitečné pro kanály překladu, progresivní upřesňování a další scénáře, ve kterých by se měl každý agent soustředit výhradně na transformaci výstupu předchozího agenta, bez vlivu na dřívější konverzační tahy.

Úplný příklad najdete v tématu sequential_chain_only_agent_responses.py v úložišti Agent Framework.

Návod

Pro podrobnější kontrolu nad kontextovým tokem, včetně vlastních funkcí filtru, si přečtěte Kontextové režimy v referenčních informacích k agent executor.

Zprostředkující výstupy

Ve výchozím nastavení se jako událost pracovního postupu output zobrazí pouze výstup posledního účastníka. Kromě intermediate_outputs=True konečného výstupu nastavíte zobrazení výstupu každého účastníka:

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

Tyto události můžete zpracovat v reálném čase v režimu streamování:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Sekvenční orchestrace se zapojením člověka

Sekvenční orchestrace podporují interakce s člověkem v procesu dvěma způsoby: schválení nástrojů pro řízení volání citlivých nástrojů a požadavek na informace pro pozastavení po každé odpovědi agenta za účelem shromáždění zpětné vazby.

Sekvenční orchestrace s lidskou kontrolou

Návod

Další podrobnosti o modelu požadavků a odpovědí najdete v tématu Human-in-the-Loop.

Schválení nástrojů v sekvenčních pracovních postupech

Slouží @tool(approval_mode="always_require") k označení nástrojů, které před provedením potřebují schválení člověkem. Pracovní postup se pozastaví a vygeneruje request_info událost, když se agent pokusí volat nástroj.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Zpracování streamu událostí a zpracování žádostí o schválení:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Návod

Úplný spustitelný příklad najdete v tématu sequential_builder_tool_approval.py. Schválení nástroje funguje bez SequentialBuilder jakékoli další konfigurace tvůrce.

Žádost o informace o zpětné vazbě agenta

Použijte .with_request_info() k pozastavení po odpovědi konkrétních agentů, což umožňuje externí vstup (například lidskou kontrolu) před tím, než začne další agent:

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Klíčové koncepty

  • Sdílený kontext: Ve výchozím nastavení každý agent využívá úplnou konverzaci předchozího agenta, včetně vstupních a odpovědí.
  • Řízení kontextu: Slouží chain_only_agent_responses=True ke konfiguraci agentů tak, aby využívaly pouze zprávy odpovědí předchozího agenta.
  • AgentResponse Output: Výstup terminálu pracovního postupu je AgentResponse obsahující odpověď posledního agenta (ne úplnou konverzaci).
  • Order Matters: Agenti provádějí úkoly výhradně v pořadí uvedeném v participants seznamu.
  • Flexibilní účastníci: Můžete kombinovat agenty a exekutory v libovolném pořadí.
  • Vlastní ukončovací kontrakt: Vlastní exekutor použitý jako poslední účastník musí volat ctx.yield_output(AgentResponse(...)) k vytvoření výstupu terminálu.
  • Mezivýstupy: Nastavte intermediate_outputs=True tak, aby se zobrazil výstup každého účastníka jako událost postupu output, nejen výstup posledního účastníka.
  • Schválení nástroje: Slouží @tool(approval_mode="always_require") k citlivým operacím, které potřebují kontrolu člověka.
  • Informace o žádosti: Slouží .with_request_info(agents=[...]) k pozastavení po konkrétních agentech pro externí zpětnou vazbu.

Další kroky