Microsoft Agent Framework-munkafolyamatok vezénylése – Egyidejű

Az egyidejű vezénylés lehetővé teszi, hogy több ügynök párhuzamosan dolgozzon ugyanazon a feladaton. Minden ügynök egymástól függetlenül dolgozza fel a bemenetet, és a rendszer összegyűjti és összesíti az eredményeket. Ez a megközelítés jól alkalmazható olyan helyzetekben, ahol a különböző perspektívák vagy megoldások értékesek, például ötletgyűjtés, együttes érvelés vagy szavazórendszerek.

Egyidejű vezénylés

Tudnivalók

  • Több különböző szakértelemmel rendelkező ügynök definiálása
  • Hogyan vezényelheti ezeket az ügynököket, hogy egyidejűleg működjenek egyetlen feladaton
  • Az eredmények összegyűjtése és feldolgozása

Egyidejű vezénylés esetén több ügynök dolgozik ugyanazon a feladaton egyidejűleg és egymástól függetlenül, és különböző perspektívákat biztosít ugyanarra a bemenetre.

Az Azure OpenAI-ügyfél beállítása

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;

// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
    throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
    .GetProjectOpenAIClient()
    .GetProjectResponsesClient()
    .AsIChatClient(deploymentName);

Figyelmeztetés

DefaultAzureCredential a fejlesztéshez kényelmes, de a termelési környezetben gondos megfontolást igényel. Éles környezetben fontolja meg egy adott hitelesítő adat (pl. ManagedIdentityCredential) használatát a késési problémák elkerülése, a hitelesítő adatok nem szándékos próbálgatásának és a tartalék mechanizmusokból eredő esetleges biztonsági kockázatok elkerülése érdekében.

Az ügynökök definiálása

Hozzon létre több speciális ügynököt, amelyek egyszerre dolgoznak ugyanazon a feladaton:

// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
    new(chatClient,
        $"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
        $"input by outputting the name of the input language and then translating the input to {targetLanguage}.");

// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
                         select GetTranslationAgent(lang, client));

Az egyidejű koordináció beállítása

Hozza létre a munkafolyamatot AgentWorkflowBuilder ügynökök párhuzamos futtatásához:

// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);

Futtassa az egyidejű munkafolyamatot, és gyűjtse össze az eredményeket

Futtassa a munkafolyamatot, és dolgozza fel az egyidejűleg futó összes ügynök eseményeit:

// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };

await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));

List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is AgentResponseUpdateEvent e)
    {
        Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
    }
    else if (evt is WorkflowOutputEvent outputEvt)
    {
        result = outputEvt.As<List<ChatMessage>>()!;
        break;
    }
}

// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
    Console.WriteLine($"{message.Role}: {message.Text}");
}

Mintakimenet

French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!

===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!

Alapfogalmak

  • Párhuzamos végrehajtás: Minden ügynök egyszerre és függetlenül dolgozza fel a bemenetet
  • AgentWorkflowBuilder.BuildConcurrent(): Egyidejű munkafolyamatot hoz létre ügynökök gyűjteményéből
  • Automatikus összesítés: Az összes ügynök eredményei automatikusan bekerülnek a végeredménybe
  • Eseménystreamelés: Az ügynök előrehaladásának valós idejű monitorozása AgentResponseUpdateEvent
  • Különböző perspektívák: Minden ügynök egyedi szakértelmét ugyanarra a problémára hozza

Az ügynökök speciális entitások, amelyek képesek feladatokat feldolgozni. A következő kód három ügynököt határoz meg: egy kutatási szakértőt, egy marketingszakértőt és egy jogi szakértőt.

import os

from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential

# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = chat_client.as_agent(
    instructions=(
        "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
        " opportunities, and risks."
    ),
    name="researcher",
)

marketer = chat_client.as_agent(
    instructions=(
        "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
        " aligned to the prompt."
    ),
    name="marketer",
)

legal = chat_client.as_agent(
    instructions=(
        "You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
        " based on the prompt."
    ),
    name="legal",
)

Az egyidejű koordináció beállítása

Az ConcurrentBuilder osztály lehetővé teszi, hogy munkafolyamatot hozzon létre több ügynök párhuzamos futtatásához. Az ügynökök listáját résztvevőként adja át.

from agent_framework.orchestrations import ConcurrentBuilder

# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Futtassa az egyidejű munkafolyamatot, és gyűjtse össze az eredményeket

Az alapértelmezett összesítő egyetlen AgentResponse , résztvevőnként egy segédüzenetet tartalmazó üzenetet hoz létre:

from agent_framework import AgentResponse

# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()

if outputs:
    print("===== Final Aggregated Results =====")
    final: AgentResponse = outputs[0]
    for msg in final.messages:
        name = msg.author_name or "assistant"
        print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")

Mintakimenet

===== Final Aggregated Results =====
------------------------------------------------------------

[researcher]:
**Insights:**

- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
    likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
    higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------

[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
    sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------

[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**

**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
    regarding e-bike classification, speed limits, power output, and safety features.

Speciális: Egyéni ügynök-végrehajtók

Az egyidejű vezénylés olyan egyéni végrehajtókat támogat, amelyek további logikával burkolják az ügynököket. Ez akkor hasznos, ha nagyobb mértékben kell szabályoznia az ügynökök inicializálását és a kérések feldolgozását:

Egyéni ügynök-végrehajtók definiálása

from agent_framework import (
    AgentExecutorRequest,
    AgentExecutorResponse,
    Agent,
    Executor,
    WorkflowContext,
    handler,
)

class ResearcherExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
                " opportunities, and risks."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

class MarketerExec(Executor):
    def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
        self.agent = chat_client.as_agent(
            instructions=(
                "You're a creative marketing strategist. Craft compelling value propositions and target messaging"
                " aligned to the prompt."
            ),
            name=id,
        )
        super().__init__(id=id)

    @handler
    async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
        response = await self.agent.run(request.messages)
        full_conversation = list(request.messages) + list(response.messages)
        await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))

Munkafolyamat létrehozása egyéni végrehajtókkal

chat_client = FoundryChatClient(
    project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
    model=os.environ["FOUNDRY_MODEL"],
    credential=AzureCliCredential(),
)

researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)

workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()

Fejlett: Egyéni összesítő

Alapértelmezés szerint az egyidejű orkesztráció minden ügynökválaszt egyetlen AgentResponse üzenetbe összesít, résztvevőnként egy asszisztens üzenettel. Ezt a viselkedést felülbírálhatja egy egyéni összesítővel, amely az eredményeket meghatározott módon dolgozza fel:

Egyéni összesítő definiálása

from agent_framework import AgentExecutorResponse

# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
    instructions=(
        "You are a helpful assistant that consolidates multiple domain expert outputs "
        "into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
    ),
    name="summarizer",
)

# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
    # Extract one final assistant message per agent
    expert_sections: list[str] = []
    for r in results:
        try:
            messages = getattr(r.agent_response, "messages", [])
            final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
            expert_sections.append(f"{r.executor_id}:\n{final_text}")
        except Exception as e:
            expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")

    # Ask the model to synthesize a concise summary of the experts' outputs
    prompt = "\n\n".join(expert_sections)
    response = await summarizer_agent.run(prompt)
    # Return the model's final assistant text as the completion result
    return response.messages[-1].text if response.messages else ""

Munkafolyamat létrehozása egyéni összesítővel

workflow = (
    ConcurrentBuilder(participants=[researcher, marketer, legal])
    .with_aggregator(summarize_results)
    .build()
)

output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
    if event.type == "output":
        output = event.data

if output:
    print("===== Final Consolidated Output =====")
    print(output)

Mintakimenet egyéni aggregátorral

===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.

Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.

Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.

Köztes kimenetek

Alapértelmezés szerint csak az aggregátor kimenete jelenik meg munkafolyamat-"output"eseményként (termináleseményként). Adja meg a(z) intermediate_output_from paraméterben azokat a résztvevőket, akiket közbenső forrásként szeretne kijelölni, hogy az egyedi kimeneteik "intermediate" eseményekként is megjelenjenek:

workflow = ConcurrentBuilder(
    participants=[researcher, marketer, legal],
    intermediate_output_from=[researcher, marketer, legal],
).build()

Ezeket az eseményeket valós időben, streamelési módban kezelheti:

from agent_framework import AgentResponseUpdate

# Track the last author to format streaming output.
last_author: str | None = None

async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
    if event.type == "intermediate" and isinstance(event.data, AgentResponseUpdate):
        update = event.data
        author = update.author_name
        if author != last_author:
            if last_author is not None:
                print()  # Newline between different authors
            print(f"{author}: {update.text}", end="", flush=True)
            last_author = author
        else:
            print(update.text, end="", flush=True)

Alapfogalmak

  • Párhuzamos végrehajtás: Minden ügynök egyidejűleg és egymástól függetlenül dolgozik a feladaton
  • AgentResponse Kimenet: Az alapértelmezett összesítő egyetlen AgentResponse üzenetet ad meg résztvevőnként egy segédüzenettel (nincs benne felhasználói kérés)
  • Különböző perspektívák: Minden ügynök egyedi szakértelmét ugyanarra a problémára hozza
  • Rugalmas résztvevők: Közvetlenül használhat ügynököket, vagy becsomagolhatja őket egyéni végrehajtókba
  • Egyéni feldolgozás: Az alapértelmezett összesítő felülbírálása az eredmények tartományspecifikus módon történő szintetizálásához
  • Köztes kimenetek: A(z) intermediate_output_from=[participant, ...] átadásával az egyes felsorolt résztvevők kimenetei "intermediate" eseményekként jelennek meg, az összesítő végső "output" eseménye mellett

Következő lépések