Orkestreringar för Microsoft Agent Framework-arbetsflöden – samtidiga

Samtidig orkestrering gör att flera agenter kan arbeta med samma uppgift parallellt. Varje agent bearbetar indata separat och deras resultat samlas in och aggregeras. Den här metoden passar bra för scenarier där olika perspektiv eller lösningar är värdefulla, till exempel brainstorming, ensemble-resonemang eller röstningssystem.

Samtidig orkestrering

Vad du ska lära dig

  • Så här definierar du flera agenter med olika expertis
  • Så här dirigerar du dessa agenter för att arbeta samtidigt med en enda uppgift
  • Så här samlar du in och bearbetar resultaten

I samtidig orkestrering arbetar flera agenter med samma uppgift samtidigt och oberoende, vilket ger olika perspektiv på samma indata.

Konfigurera Azure OpenAI-klienten

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Varning

DefaultAzureCredential är praktiskt för utveckling men kräver noggrant övervägande i produktion. I produktion bör du överväga att använda en specifik autentiseringsuppgift (t.ex. ManagedIdentityCredential) för att undvika problem med svarstid, oavsiktlig avsökning av autentiseringsuppgifter och potentiella säkerhetsrisker från reservmekanismer.

Definiera dina agenter

Skapa flera specialiserade agenter som ska arbeta med samma uppgift samtidigt:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Konfigurera samtidig orkestrering

Skapa arbetsflödet med AgentWorkflowBuilder för att köra agenter parallellt:

// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);

Kör det samtidiga arbetsflödet och samla in resultat

Kör arbetsflödet och bearbeta händelser från alla agenter som körs samtidigt:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Exempelutdata

French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!

===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!

Viktiga begrepp

  • Parallell körning: Alla agenter bearbetar indata samtidigt och oberoende
  • AgentWorkflowBuilder.BuildConcurrent(): Skapar ett samtidigt arbetsflöde från en samling agenter
  • Automatisk aggregering: Resultat från alla agenter samlas automatiskt in i slutresultatet
  • Händelseströmning: Realtidsövervakning av agentförlopp via AgentResponseUpdateEvent
  • Olika perspektiv: Varje agent tar med sig sin unika expertis till samma problem

Agenter är specialiserade entiteter som kan bearbeta uppgifter. Följande kod definierar tre agenter: en forskningsexpert, en marknadsföringsexpert och en juridisk expert.

import os

from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = chat_client.as_agent(
    instructions=(
        "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
        " opportunities, and risks."
    ),
    name="researcher",
)

marketer = chat_client.as_agent(
    instructions=(
        "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
        " aligned to the prompt."
    ),
    name="marketer",
)

legal = chat_client.as_agent(
    instructions=(
        "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
        " based on the prompt."
    ),
    name="legal",
)

Konfigurera samtidig orkestrering

Med ConcurrentBuilder klassen kan du skapa ett arbetsflöde för att köra flera agenter parallellt. Du skickar listan med agenter som deltagare.

from agent_framework.orchestrations import ConcurrentBuilder

# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Kör det samtidiga arbetsflödet och samla in resultaten

Standardaggregatorn skapar en enda AgentResponse som innehåller ett assistentmeddelande per deltagare:

from agent_framework import AgentResponse

# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Aggregated Results =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")

Exempelutdata

===== Final Aggregated Results =====
------------------------------------------------------------

[researcher]:
**Insights:**

- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
    likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
    higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------

[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
    sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------

[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**

**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
    regarding e-bike classification, speed limits, power output, and safety features.

Avancerat: Exekverare för anpassade agenter

Parallell orkestrering stöder anpassade exekverare som förser agenter med ytterligare logik. Detta är användbart när du behöver mer kontroll över hur agenter initieras och hur de bearbetar begäranden:

Definiera anpassade agentexekutorer

from agent_framework import (
    AgentExecutorRequest,
    AgentExecutorResponse,
    Agent,
    Executor,
    WorkflowContext,
    handler,
)

class ResearcherExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
                " opportunities, and risks."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

class MarketerExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
                " aligned to the prompt."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

Skapa ett arbetsflöde med anpassade körverktyg

chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)

workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Avancerat: Anpassad aggregerare

Som standard aggregerar samtidig orkestrering alla agentsvar till en enda AgentResponse med ett assistentmeddelande per deltagare. Du kan åsidosätta det här beteendet med en anpassad aggregator som bearbetar resultatet på ett specifikt sätt:

Definiera en anpassad aggregator

from agent_framework import AgentExecutorResponse

# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
    instructions=(
        "You are a helpful assistant that consolidates multiple domain expert outputs "
        "into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
    ),
    name="summarizer",
)

# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
    # Extract one final assistant message per agent
    expert_sections: list[str] = []
    for r in results:
        try:
            messages = getattr(r.agent_response, "messages", [])
            final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
            expert_sections.append(f"{r.executor_id}:\n{final_text}")
        except Exception as e:
            expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")

    # Ask the model to synthesize a concise summary of the experts' outputs
    prompt = "\n\n".join(expert_sections)
    response = await summarizer_agent.run(prompt)
    # Return the model's final assistant text as the completion result
    return response.messages[-1].text if response.messages else ""

Skapa ett arbetsflöde med anpassad aggregator

workflow = (
    ConcurrentBuilder(participants=[researcher, marketer, legal])
    .with_aggregator(summarize_results)
    .build()
)

output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
    if event.type == "output":
        output = event.data

if output:
    print("===== Final Consolidated Output =====")
    print(output)

Exempel på utdata med anpassad aggregator

===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.

Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.

Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.

Mellanliggande utdata

Som standard visas endast aggregatorns utdata som en arbetsflödeshändelse output . Ange intermediate_outputs=True för att även visa varje deltagares individuella utdata:

workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal],
    intermediate_outputs=True,
).build()

Du kan hantera dessa händelser i realtid i strömningsläge:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
    if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Viktiga begrepp

  • Parallell körning: Alla agenter arbetar med uppgiften samtidigt och oberoende av varandra
  • AgentResponse-utdata: Standardaggregatorn ger en enda AgentResponse med ett assistentmeddelande per deltagare (ingen användarfråga ingår)
  • Olika perspektiv: Varje agent tar med sig sin unika expertis till samma problem
  • Flexibla deltagare: Du kan använda agenter direkt eller omsluta dem i anpassade utförare
  • Anpassad bearbetning: Åsidosätt standardaggregatorn för att syntetisera resultat på domänspecifika sätt
  • Mellanliggande utdata: Ställ in intermediate_outputs=True för att visa varje deltagares individuella utdata, utöver aggregatorns slutliga utdata

Nästa steg