Microsoft Agent Framework Workflow-Orchestrierungen - Sequenzielle

In sequenzieller Orchestrierung werden Agents in einer Pipeline organisiert. Jeder Agent verarbeitet die Aufgabe wiederum und übergibt die Ausgabe an den nächsten Agent in der Sequenz. Dies ist ideal für Workflows, bei denen jeder Schritt auf dem vorherigen aufbaut, z. B. Dokumentüberprüfung, Datenverarbeitungspipeline oder mehrstufiges Denken.

Sequenzielle Orchestrierung

Von Bedeutung

Standardmäßig konsumiert jeder Agent in der Sequenz die vollständige Konversation des vorherigen Agenten – sowohl die Eingabemeldungen, die dem vorherigen Agenten zur Verfügung gestellt werden, als auch dessen Antwortnachrichten. Sie können Agents so konfigurieren, dass nur die Antwortnachrichten des vorherigen Agents verwendet werden. Details finden Sie unter "Steuern des Kontexts zwischen Agents ".

Sie lernen Folgendes

  • Erstellen einer sequenziellen Pipeline von Agenten
  • So verketten Sie Agenten, bei denen jeder auf der vorherigen Ausgabe aufbaut
  • So fügen Sie die Genehmigung von Menschen in der Schleife für vertrauliche Toolaufrufe hinzu
  • So mischen Sie Agents mit benutzerdefinierten Executoren für spezialisierte Aufgaben
  • Nachverfolgen des Gesprächsverlaufs durch die Pipeline

Definieren Sie Ihre Agenten

In sequenzieller Orchestrierung werden Agents in einer Pipeline organisiert, in der jeder Agent die Aufgabe wiederum verarbeitet und die Ausgabe an den nächsten Agent in der Sequenz übergibt.

Einrichten des Azure OpenAI-Clients

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Warnung

DefaultAzureCredential ist praktisch für die Entwicklung, erfordert aber sorgfältige Überlegungen in der Produktion. Berücksichtigen Sie in der Produktion die Verwendung bestimmter Anmeldeinformationen (z. B. ManagedIdentityCredential), um Latenzprobleme, unbeabsichtigte Abfragen von Anmeldeinformationen und potenzielle Sicherheitsrisiken durch Ausweichmechanismen zu vermeiden.

Erstellen Sie spezielle Agents, die nacheinander arbeiten:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Einrichten der sequenziellen Orchestrierung

Erstellen Sie den Workflow mithilfe von AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Ausführen des sequenziellen Workflows

Führen Sie den Workflow aus, und verarbeiten Sie die Ereignisse:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Beispielausgabe

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Sequenzielle Orchestrierung mit Human-in-the-Loop

Sequenzielle Orchestrierungen unterstützen Benutzerinteraktionen durch die Toolgenehmigung. Wenn Agenten Tools verwenden, die mit ApprovalRequiredAIFunction eingeschlossen wurden, wird der Workflow angehalten und es wird ein RequestInfoEvent ausgegeben, der ein ToolApprovalRequestContent enthält. Externe Systeme (z. B. ein menschlicher Operator) können den Toolaufruf überprüfen, genehmigen oder ablehnen und der Workflow wird entsprechend fortgesetzt.

Sequenzielle Orchestrierung mit Human-in-the-Loop

Tipp

Weitere Informationen zum Anforderungs- und Antwortmodell finden Sie unter Human-in-the-Loop.

Definieren von Agenten mit genehmigungspflichtigen Tools

Erstellen Sie Agenten, bei denen vertrauliche Tools eingebunden sind: ApprovalRequiredAIFunction

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Erstellen und Ausführen mit Genehmigungsprozess

Erstellen Sie den sequenziellen Workflow wie gewohnt. Der Genehmigungsprozess wird über den Ereignisstrom gehandhabt.

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Hinweis

AgentWorkflowBuilder.BuildSequential() unterstützt die sofort einsatzbereite Toolgenehmigung – es ist keine zusätzliche Konfiguration erforderlich. Wenn ein Agent ein Tool aufruft, das mit ApprovalRequiredAIFunction umgeben ist, pausiert der Workflow automatisch und gibt ein RequestInfoEvent aus.

Tipp

Ein vollständiges runnables Beispiel für diesen Genehmigungsfluss finden Sie im GroupChatToolApproval Beispiel. Dasselbe RequestInfoEvent Behandlungsmuster gilt für andere Orchestrationen.

Wichtige Konzepte

  • Sequenzielle Verarbeitung: Jeder Agent verarbeitet die Ausgabe des vorherigen Agents in der Reihenfolge
  • AgentWorkflowBuilder.BuildSequential(): Erstellt einen Pipelineworkflow aus einer Sammlung von Agents.
  • ChatClientAgent: Stellt einen Agent dar, der von einem Chatclient mit bestimmten Anweisungen unterstützt wird.
  • InProcessExecution.RunStreamingAsync(): Führt den Workflow aus und gibt einen StreamingRun für Echtzeitereignisstreaming zurück.
  • Ereignisbehandlung: Überwachungen des Agentenfortschritts durch AgentResponseUpdateEvent und Erfassung des Abschlusses durch WorkflowOutputEvent
  • Toolgenehmigung: Umfassen Sie sensible Tools mit ApprovalRequiredAIFunction, um vor der Ausführung eine menschliche Genehmigung zu erfordern.
  • RequestInfoEvent: Wird ausgegeben, wenn ein Tool eine Genehmigung erfordert; enthält ToolApprovalRequestContent details zum Toolaufruf

Bei der sequenziellen Orchestrierung verarbeitet jeder Agent die Aufgabe nacheinander, wobei die Ausgabe von einem zum nächsten fließt. Definieren Sie zunächst Agents für einen zweistufigen Prozess:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Einrichten der sequenziellen Orchestrierung

Die SequentialBuilder Klasse erstellt eine Pipeline, in der Agents Aufgaben in der Reihenfolge verarbeiten. Jeder Agent sieht den vollständigen Unterhaltungsverlauf und fügt seine Antwort hinzu:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Ausführen des sequenziellen Workflows

Führen Sie den Workflow aus, und erfassen Sie die endgültige Ausgabe. Die Terminalausgabe ist ein AgentResponse mit den Antwortnachrichten des letzten Agenten:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Beispielausgabe

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Erweitert: Mischen von Agents mit benutzerdefinierten Executoren

Sequenzielle Orchestrierung unterstützt gemischte Agenten mit benutzerdefinierten Ausführungs-Engines für die spezialisierte Verarbeitung. Dies ist nützlich, wenn Sie benutzerdefinierte Logik benötigen, für die keine LLM erforderlich ist:

Definieren eines benutzerdefinierten Executors

Hinweis

Wenn ein benutzerdefinierter Executor einem Agenten in der Sequenz folgt, empfängt sein Handler ein AgentExecutorResponse (da Agenten intern von AgentExecutor umschlossen werden). Verwenden Sie agent_response.full_conversation, um auf den vollständigen Unterhaltungsverlauf zuzugreifen. Ein benutzerdefinierter Executor, der als letzter Teilnehmer (Terminator) verwendet wird, muss ctx.yield_output(AgentResponse(...)) aufrufen, damit seine Ausgabe zur Endausgabe des Workflows wird.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Erstellen eines gemischten sequenziellen Workflows

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Beispielausgabe mit benutzerdefiniertem Executor

===== Final Summary =====
Summary -> users:1 assistants:1

Steuerung des Kontexts zwischen Agents

Standardmäßig verwendet jeder Agent in einem SequentialBuilder Workflow die vollständige Unterhaltung des vorherigen Agents (Eingabe + Antwortnachrichten). Durch das Festlegen von chain_only_agent_responses=True wird die Sequenz aller Agents so konfiguriert, dass sie ausschließlich die Antwortnachrichten des vorherigen Agents verarbeitet.

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Dies ist nützlich für Übersetzungspipelines, progressive Verfeinerung und andere Szenarien, in denen sich jeder Agent ausschließlich auf die Transformation der Ausgabe des vorherigen Agents konzentrieren sollte, ohne von früheren Gesprächsverläufen beeinflusst zu werden.

Ein vollständiges Beispiel finden Sie unter sequential_chain_only_agent_responses.py im Agent Framework-Repository.

Tipp

Weitere differenzierte Kontrolle über den Kontextfluss – einschließlich benutzerdefinierter Filterfunktionen – finden Sie unter Kontextmodi in der Referenz zum Agent-Executor.

Zwischenausgaben

Standardmäßig wird nur das Ergebnis des letzten Teilnehmers als Workflow-Ereignis output angezeigt. Stellen Sie intermediate_outputs=True ein, um zusätzlich zur endgültigen Ausgabe die Ausgabe jedes Teilnehmers anzuzeigen:

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

Sie können diese Ereignisse im Streamingmodus in Echtzeit behandeln:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Sequenzielle Orchestrierung mit Human-in-the-Loop

Sequenzielle Orchestrierungen unterstützen Interaktionen mit menschlicher Beteiligung auf zwei Arten: Toolgenehmigungen für die Kontrolle vertraulicher Toolaufrufe und Informationsanfragen zum Anhalten nach jeder Agentenantwort, um Feedback zu sammeln.

Sequenzielle Orchestrierung mit Human-in-the-Loop

Tipp

Weitere Informationen zum Anforderungs- und Antwortmodell finden Sie unter Human-in-the-Loop.

Toolgenehmigung in sequenziellen Workflows

Verwenden Sie @tool(approval_mode="always_require") um Werkzeuge zu markieren, die vor der Ausführung eine menschliche Genehmigung benötigen. Der Workflow hält an und gibt ein request_info Ereignis aus, wenn der Agent versucht, das Tool aufzurufen.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Verarbeiten des Ereignisdatenstroms und Behandeln von Genehmigungsanforderungen:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Tipp

Ein vollständiges runnables Beispiel finden Sie unter sequential_builder_tool_approval.py. Die Toolfreigabe funktioniert SequentialBuilder ohne zusätzliche Erstellerkonfiguration.

Informationen zur Rückmeldung des Agenten anfordern

Verwenden Sie .with_request_info(), um nach der Reaktion bestimmter Agenten eine Pause einzulegen, sodass externe Eingaben (wie z. B. eine menschliche Prüfung) vorgenommen werden können, bevor der nächste Agent beginnt.

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Tipp

Sehen Sie sich die vollständigen Beispiele an: sequenzielle Toolgenehmigung und sequenzielle Anforderungsinformationen.

Wichtige Konzepte

  • Freigegebener Kontext: Standardmäßig verwendet jeder Agent die vollständige Unterhaltung des vorherigen Agents, einschließlich Eingabe- und Antwortnachrichten.
  • Kontextsteuerung: Wird verwendet chain_only_agent_responses=True , um Agents so zu konfigurieren, dass nur die Antwortnachrichten des vorherigen Agents verwendet werden.
  • AgentResponse-Ausgabe: Die Terminalausgabe des Workflows enthält AgentResponse die Antwort des letzten Agents (nicht die vollständige Unterhaltung)
  • Order Matters: Agenten führen streng in der participants Liste angegebenen Reihenfolge aus
  • Flexible Teilnehmer: Sie können Agents und benutzerdefinierte Executoren in beliebiger Reihenfolge kombinieren.
  • Benutzerdefinierter Terminator-Vertrag: Ein benutzerdefinierter Executor, der als letzter Teilnehmer verwendet wird, muss anrufen ctx.yield_output(AgentResponse(...)) , um die Terminalausgabe zu erzeugen.
  • Zwischenausgaben: Legen Sie fest, dass die Ausgabe jedes Teilnehmers als Workflowereignis output angezeigt wird, nicht nur die des letzten Teilnehmers
  • Toolgenehmigung: Verwendung @tool(approval_mode="always_require") für vertrauliche Vorgänge, die eine menschliche Überprüfung erfordern
  • Info anfordern: Verwenden Sie .with_request_info(agents=[...]), um nach bestimmten Agenten für externes Feedback anzuhalten

Nächste Schritte