Orkiestracje przepływów pracy programu Microsoft Agent Framework — sekwencyjne

W sekwencyjnej organizacji agenci są zorganizowani w kanałach. Każdy agent przetwarza zadanie z kolei, przekazując dane wyjściowe do następnego agenta w sekwencji. Jest to idealne rozwiązanie w przypadku przepływów pracy, w których każdy krok opiera się na poprzednim, takim jak przegląd dokumentów, potoki przetwarzania danych lub rozumowanie wieloetapowe.

Sekwencja orkiestracji

Ważne

Domyślnie każdy agent w sekwencji korzysta z pełnej konwersacji poprzedniego agenta — zarówno komunikatów wejściowych dostarczonych do poprzedniego agenta, jak i komunikatów odpowiedzi. Zamiast tego można skonfigurować agentów tak, aby używali tylko komunikatów odpowiedzi poprzedniego agenta. Aby uzyskać szczegółowe informacje, zobacz Kontrolowanie kontekstu między agentami .

Czego nauczysz się

  • Jak utworzyć sekwencyjny potok agentów
  • Jak utworzyć łańcuch agentów, w których każda z nich opiera się na poprzednich danych wyjściowych
  • Jak dodać zatwierdzenie z udziałem człowieka w cyklu dla wywołań poufnych narzędzi
  • Jak mieszać agentów z niestandardowymi wykonawcami dla wyspecjalizowanych zadań
  • Jak śledzić przepływ konwersacji przez pipeline

Definiowanie agentów

W sekwencyjnej aranżacji agenci są zorganizowani w potoku, w którym każdy agent przetwarza zadanie z kolei, przekazując dane wyjściowe do następnego agenta w sekwencji.

Konfigurowanie klienta usługi Azure OpenAI

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Ostrzeżenie

DefaultAzureCredential jest wygodne do programowania, ale wymaga starannego rozważenia w środowisku produkcyjnym. W środowisku produkcyjnym rozważ użycie określonego poświadczenia (np. ManagedIdentityCredential), aby uniknąć problemów z opóźnieniami, niezamierzonego sondowania poświadczeń i potencjalnych zagrożeń bezpieczeństwa wynikających z mechanizmów awaryjnych.

Utwórz wyspecjalizowanych agentów, którzy będą działać w sekwencji:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for sequential processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Konfigurowanie sekwencyjnej orkiestracji

Skompiluj przepływ pracy przy użyciu polecenia AgentWorkflowBuilder:

// 3) Build sequential workflow
var workflow = AgentWorkflowBuilder.BuildSequential(translationAgents);

Uruchamianie sekwencyjnego przepływu pracy

Wykonaj przepływ pracy i przetwórz zdarzenia:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

string? lastExecutorId = null;
List<ChatMessage> result = [];
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        if (e.ExecutorId != lastExecutorId)
        {
            lastExecutorId = e.ExecutorId;
            Console.WriteLine();
            Console.Write($"{e.ExecutorId}: ");
        }

        Console.Write(e.Update.Text);
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display final result
Console.WriteLine();
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Przykładowe dane wyjściowe

French_Translation: User: Hello, world!
French_Translation: Assistant: English detected. Bonjour, le monde !
Spanish_Translation: Assistant: French detected. ¡Hola, mundo!
English_Translation: Assistant: Spanish detected. Hello, world!

Sekwencyjna orkiestracja z udziałem człowieka

Sekwencyjne orkiestracje wspierają interakcje z udziałem człowieka poprzez aprobację narzędzi. Gdy agenci używają narzędzi opakowanych w polecenie ApprovalRequiredAIFunction, przepływ pracy wstrzymuje się i emituje RequestInfoEvent, który zawiera ToolApprovalRequestContent. Systemy zewnętrzne (takie jak operator ludzki) mogą sprawdzać wywołanie narzędzia, zatwierdzać je lub odrzucać, a przepływ pracy zostaje odpowiednio wznowiony.

Sekwencyjne orkiestracja za pomocą pętli human-in-the-loop

Wskazówka

Aby uzyskać więcej informacji na temat modelu żądania i odpowiedzi, zobacz Human-in-the-Loop.

Definiowanie agentów za pomocą narzędzi wymagających zatwierdzenia

Utwórz agentów, w których poufne narzędzia są opakowane za pomocą polecenia ApprovalRequiredAIFunction:

ChatClientAgent deployAgent = new(
    client,
    "You are a DevOps engineer. Check staging status first, then deploy to production.",
    "DeployAgent",
    "Handles deployments",
    [
        AIFunctionFactory.Create(CheckStagingStatus),
        new ApprovalRequiredAIFunction(AIFunctionFactory.Create(DeployToProduction))
    ]);

ChatClientAgent verifyAgent = new(
    client,
    "You are a QA engineer. Verify that the deployment was successful and summarize the results.",
    "VerifyAgent",
    "Verifies deployments");

Kompilowanie i uruchamianie przy użyciu obsługi zatwierdzania

Skompiluj przepływ pracy sekwencyjny normalnie. Przepływ zatwierdzania jest obsługiwany za pośrednictwem strumienia zdarzeń:

var workflow = AgentWorkflowBuilder.BuildSequential([deployAgent, verifyAgent]);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is RequestInfoEvent e &&
        e.Request.TryGetDataAs(out ToolApprovalRequestContent? approvalRequest))
    {
        await run.SendResponseAsync(
            e.Request.CreateResponse(approvalRequest.CreateResponse(approved: true)));
    }
}

Uwaga / Notatka

AgentWorkflowBuilder.BuildSequential() obsługuje domyślne zatwierdzanie narzędzi — nie jest wymagana żadna dodatkowa konfiguracja. Gdy agent wywołuje narzędzie owinięte w ApprovalRequiredAIFunction, przepływ pracy automatycznie wstrzymuje się i emituje RequestInfoEvent sygnał.

Wskazówka

Pełny przykład działania tego przepływu zatwierdzania można znaleźć w przykładzieGroupChatToolApproval. Ten sam RequestInfoEvent wzorzec obsługi ma zastosowanie do innych orkiestracji.

Kluczowe pojęcia

  • Przetwarzanie sekwencyjne: każdy agent przetwarza dane wyjściowe poprzedniego agenta w kolejności
  • AgentWorkflowBuilder.BuildSequential(): Tworzy sekwencyjny przepływ pracy z kolekcji agentów
  • ChatClientAgent: reprezentuje agenta wspieranego przez klienta czatu z określonymi instrukcjami
  • InProcessExecution.RunStreamingAsync(): Uruchamia przepływ pracy i zwraca element StreamingRun do przesyłania strumieniowego zdarzeń w czasie rzeczywistym
  • Obsługa zdarzeń: Monitorowanie postępu agenta za pomocą AgentResponseUpdateEvent oraz ukończenie za pomocą WorkflowOutputEvent
  • Zatwierdzanie narzędzi: owinięcie poufnych narzędzi za pomocą ApprovalRequiredAIFunction w celu wymagania zatwierdzenia przez człowieka przed wykonaniem
  • RequestInfoEvent: emitowany, gdy narzędzie wymaga zatwierdzenia; zawiera ToolApprovalRequestContent ze szczegółami wywołania narzędzia

W sekwencyjnej aranżacji każdy agent przetwarza zadanie z kolei, a dane wyjściowe przepływają od jednego do następnego. Zacznij od zdefiniowania agentów dla dwuetapowego procesu:

import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

writer = chat_client.as_agent(
    instructions=(
        "You are a concise copywriter. Provide a single, punchy marketing sentence based on the prompt."
    ),
    name="writer",
)

reviewer = chat_client.as_agent(
    instructions=(
        "You are a thoughtful reviewer. Give brief feedback on the previous assistant message."
    ),
    name="reviewer",
)

Konfigurowanie sekwencyjnej orkiestracji

Klasa SequentialBuilder tworzy strumień, w którym moduły przetwarzają zadania według kolejności. Każdy agent widzi pełną historię konwersacji i dodaje odpowiedź:

from agent_framework.orchestrations import SequentialBuilder

# 2) Build sequential workflow: writer -> reviewer
workflow = SequentialBuilder(participants=[writer, reviewer]).build()

Uruchamianie sekwencyjnego przepływu pracy

Wykonaj przepływ pracy i zbierz końcowe dane wyjściowe. Dane wyjściowe terminalu to AgentResponse zawierające komunikaty odpowiedzi ostatniego agenta:

from agent_framework import AgentResponse

# 3) Run and print the last agent's response
events = await workflow.run("Write a tagline for a budget-friendly eBike.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Response =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"[{name}]\n{msg.text}")

Przykładowe dane wyjściowe

===== Final Response =====
[reviewer]
This tagline clearly communicates affordability and the benefit of extended travel, making it
appealing to budget-conscious consumers. It has a friendly and motivating tone, though it could
be slightly shorter for more punch. Overall, a strong and effective suggestion!

Zaawansowane: Łączenie agentów z niestandardowymi wykonawcami

Sekwencyjna orkiestracja obsługuje mieszanie agentów z niestandardowymi wykonawcami dla wyspecjalizowanego przetwarzania. Jest to przydatne, gdy potrzebna jest logika niestandardowa, która nie wymaga modułu LLM:

Definiowanie niestandardowego modułu wykonawczego

Uwaga / Notatka

Gdy niestandardowy wykonawca następuje po agencie w sekwencji, jego obsługujący otrzymuje AgentExecutorResponse (ponieważ agenci są wewnętrznie opakowani przez AgentExecutor). Użyj polecenia agent_response.full_conversation , aby uzyskać dostęp do pełnej historii konwersacji. Niestandardowy wykonawca używany jako ostatni uczestnik (terminator) musi wywołać ctx.yield_output(AgentResponse(...)), aby jego dane wyjściowe stały się danymi zakończeniowymi procesu roboczego.

from agent_framework import AgentExecutorResponse, AgentResponse, Executor, WorkflowContext, handler
from agent_framework import Message
from typing_extensions import Never

class Summarizer(Executor):
    """Terminator custom executor: consumes full conversation and yields a summary as the workflow's final answer."""

    @handler
    async def summarize(
        self,
        agent_response: AgentExecutorResponse,
        ctx: WorkflowContext[Never, AgentResponse]
    ) -> None:
        if not agent_response.full_conversation:
            await ctx.yield_output(AgentResponse(messages=[Message("assistant", ["No conversation to summarize."])]))
            return

        users = sum(1 for m in agent_response.full_conversation if m.role == "user")
        assistants = sum(1 for m in agent_response.full_conversation if m.role == "assistant")
        summary = Message("assistant", [f"Summary -> users:{users} assistants:{assistants}"])
        await ctx.yield_output(AgentResponse(messages=[summary]))

Tworzenie mieszanego sekwencyjnego przepływu pracy

# Create a content agent
content = chat_client.as_agent(
    instructions="Produce a concise paragraph answering the user's request.",
    name="content",
)

# Build sequential workflow: content -> summarizer
summarizer = Summarizer(id="summarizer")
workflow = SequentialBuilder(participants=[content, summarizer]).build()

Przykładowe dane wyjściowe z niestandardowym wykonawcą

===== Final Summary =====
Summary -> users:1 assistants:1

Zarządzanie kontekstem między agentami

Domyślnie każdy agent w procesie SequentialBuilder korzysta z pełnej konwersacji poprzedniego agenta (wiadomości wejściowe oraz odpowiedzi). Ustawienie chain_only_agent_responses=True ustawia wszystkich agentów w sekwencji tak, aby konsumowali wyłącznie komunikaty odpowiedzi poprzedniego agenta:

workflow = SequentialBuilder(
    participants=[writer, translator, reviewer],
    chain_only_agent_responses=True,
).build()

Jest to przydatne w przypadku potoków tłumaczenia, stopniowego uściślenia i innych scenariuszy, w których każdy agent powinien skupić się wyłącznie na przekształcaniu danych wyjściowych poprzedniego agenta bez wpływu wcześniejszych etapów konwersacji.

Aby zapoznać się z kompletnym przykładem, zobacz sequential_chain_only_agent_responses.py w repozytorium Agent Framework.

Wskazówka

Aby uzyskać bardziej szczegółową kontrolę nad przepływem kontekstu — w tym funkcjami niestandardowego filtru — zobacz Tryby kontekstu w odniesieniu do Agenta Wykonawczego.

Dane wyjściowe pośrednie

Domyślnie tylko wyniki ostatniego uczestnika pojawiają się jako zdarzenie przepływu pracy output. Ustaw intermediate_outputs=True tak, aby ujawnić dane wyjściowe każdego uczestnika, oprócz danych końcowego wyniku.

workflow = SequentialBuilder(
    participants=[writer, reviewer, editor],
    intermediate_outputs=True,
).build()

Te zdarzenia można obsługiwać w czasie rzeczywistym w trybie przesyłania strumieniowego:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Write a tagline for a budget-friendly eBike.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Sekwencyjna orkiestracja z udziałem człowieka

Sekwencyjne aranżacje obsługują interakcje człowieka w pętli na dwa sposoby: zatwierdzanie narzędzi do kontrolowania poufnych wywołań narzędzi i żądanie wstrzymania informacji po każdej odpowiedzi agenta w celu zebrania opinii.

Sekwencyjne orkiestracja za pomocą pętli human-in-the-loop

Wskazówka

Aby uzyskać więcej informacji na temat modelu żądania i odpowiedzi, zobacz Human-in-the-Loop.

Zatwierdzanie narzędzi w sekwencyjnych przepływach pracy

Użyj @tool(approval_mode="always_require") do oznaczania narzędzi, które wymagają zatwierdzenia przez człowieka przed wykonaniem. Przepływ pracy wstrzymuje się i emituje request_info zdarzenie, gdy agent próbuje wywołać narzędzie.

@tool(approval_mode="always_require")
def execute_database_query(query: str) -> str:
    return f"Query executed successfully: {query}"


database_agent = Agent(
    client=chat_client,
    name="DatabaseAgent",
    instructions="You are a database assistant.",
    tools=[execute_database_query],
)

workflow = SequentialBuilder(participants=[database_agent]).build()

Przetwarzanie strumienia zdarzeń i obsługa żądań zatwierdzenia:

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info" and event.data.type == "function_approval_request":
            responses[event.request_id] = event.data.to_function_approval_response(approved=True)
    return responses if responses else None

stream = workflow.run("Check the schema and update all pending orders", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Wskazówka

Aby uzyskać pełny przykład możliwych do uruchomienia, zobacz sequential_builder_tool_approval.py. Zatwierdzanie narzędzi działa z SequentialBuilder bez żadnej dodatkowej konfiguracji konstruktora.

Żądanie informacji o opinii agenta

Użyj .with_request_info() do zatrzymania po odpowiedziach określonych agentów, umożliwiając zewnętrzne dane wejściowe (takie jak przegląd przez człowieka) przed rozpoczęciem następnego agenta.

drafter = Agent(
    client=chat_client,
    name="drafter",
    instructions="You are a document drafter. Create a brief draft on the given topic.",
)

editor = Agent(
    client=chat_client,
    name="editor",
    instructions="You are an editor. Review and improve the draft. Incorporate any human feedback.",
)

finalizer = Agent(
    client=chat_client,
    name="finalizer",
    instructions="You are a finalizer. Create a polished final version.",
)

# Enable request info for the editor agent only
workflow = (
    SequentialBuilder(participants=[drafter, editor, finalizer])
    .with_request_info(agents=["editor"])
    .build()
)

async def process_event_stream(stream):
    responses = {}
    async for event in stream:
        if event.type == "request_info":
            responses[event.request_id] = AgentRequestInfoResponse.approve()
    return responses if responses else None

stream = workflow.run("Write a brief introduction to artificial intelligence.", stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Kluczowe pojęcia

  • Kontekst udostępniony: domyślnie każdy agent używa pełnej konwersacji poprzedniego agenta, w tym komunikatów wejściowych i odpowiedzi
  • Kontrola kontekstu: służy chain_only_agent_responses=True do konfigurowania agentów do korzystania tylko z komunikatów odpowiedzi poprzedniego agenta
  • Wyjście AgentResponse: Terminalne wyjście przepływu pracy to AgentResponse zawierający odpowiedź ostatniego agenta (nie pełną konwersację)
  • Order Matters: Agenci wykonują ściśle w kolejności określonej w participants liście
  • Elastyczni uczestnicy: możesz mieszać agentów i niestandardowych wykonawców w dowolnej kolejności
  • Niestandardowy kontrakt terminatora: Niestandardowy wykonawca używany jako ostatni uczestnik musi wywołać ctx.yield_output(AgentResponse(...)) aby wygenerować wyjściowe dane terminalowe
  • Dane wyjściowe pośrednie: ustaw intermediate_outputs=True , aby wyświetlić dane wyjściowe każdego uczestnika jako zdarzenie przepływu pracy output , a nie tylko ostatniego uczestnika
  • Aprobata narzędzia: Używaj @tool(approval_mode="always_require") do obsługi wrażliwych operacji wymagających przeglądu przez człowieka
  • Żądaj informacji: wykorzystaj .with_request_info(agents=[...]) , aby zatrzymać się przy określonych agentach w celu uzyskania opinii zewnętrznej

Dalsze kroki