Nota
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare ad accedere o modificare le directory.
L'accesso a questa pagina richiede l'autorizzazione. È possibile provare a modificare le directory.
L'orchestrazione simultanea consente a più agenti di lavorare sulla stessa attività in parallelo. Ogni agente elabora l'input in modo indipendente e i relativi risultati vengono raccolti e aggregati. Questo approccio è particolarmente adatto per scenari in cui prospettive o soluzioni diverse sono preziose, ad esempio brainstorming, ragionamento di insieme o sistemi di voto.
Cosa Imparerai
- Come definire più agenti con competenze diverse
- Come orchestrare questi agenti per lavorare simultaneamente in una singola attività
- Come raccogliere ed elaborare i risultati
Nell'orchestrazione simultanea, più agenti lavorano contemporaneamente sulla stessa attività e in modo indipendente, fornendo prospettive diverse sullo stesso input.
Configurare il client OpenAI di Azure
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.Projects;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AIProjectClient(new Uri(endpoint), new DefaultAzureCredential())
.GetProjectOpenAIClient()
.GetProjectResponsesClient()
.AsIChatClient(deploymentName);
Avviso
DefaultAzureCredential è utile per lo sviluppo, ma richiede un'attenta considerazione nell'ambiente di produzione. Nell'ambiente di produzione prendere in considerazione l'uso di credenziali specifiche ,ad esempio ManagedIdentityCredential, per evitare problemi di latenza, probe di credenziali indesiderate e potenziali rischi per la sicurezza dai meccanismi di fallback.
Definire gli agenti
Creare più agenti specializzati che funzioneranno contemporaneamente sulla stessa attività:
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
Configurare l'orchestrazione simultanea
Compilare il flusso di lavoro usando AgentWorkflowBuilder per eseguire gli agenti in parallelo:
// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
Eseguire il flusso di lavoro simultaneo e raccogliere i risultati
Eseguire il flusso di lavoro ed elaborare gli eventi da tutti gli agenti in esecuzione simultaneamente:
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
await using StreamingRun run = await InProcessExecution.RunStreamingAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is AgentResponseUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Update.Text}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = outputEvt.As<List<ChatMessage>>()!;
break;
}
}
// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Text}");
}
Output di esempio
French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!
===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!
Concetti chiave
- Esecuzione parallela: tutti gli agenti elaborano l'input simultaneamente e in modo indipendente
- AgentWorkflowBuilder.BuildConcurrent(): crea un flusso di lavoro simultaneo da una raccolta di agenti
- Aggregazione automatica: i risultati di tutti gli agenti vengono raccolti automaticamente nel risultato finale
-
Streaming di eventi: monitoraggio in tempo reale dell'avanzamento dell'agente tramite
AgentResponseUpdateEvent - Prospettive diverse: ogni agente porta la propria esperienza unica allo stesso problema
Gli agenti sono entità specializzate in grado di elaborare le attività. Il codice seguente definisce tre agenti: un esperto di ricerca, un esperto di marketing e un esperto legale.
import os
from agent_framework.foundry import FoundryChatClient
from azure.identity import AzureCliCredential
# 1) Create three domain agents using FoundryChatClient
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.as_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
Configurare l'orchestrazione simultanea
La ConcurrentBuilder classe consente di costruire un flusso di lavoro per eseguire più agenti in parallelo. L'elenco degli agenti viene passato come partecipanti.
from agent_framework.orchestrations import ConcurrentBuilder
# 2) Build a concurrent workflow
# Participants are either Agents (type of SupportsAgentRun) or Executors
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
Eseguire il flusso di lavoro simultaneo e raccogliere i risultati
L'aggregatore predefinito produce un singolo AgentResponse messaggio di assistente per partecipante:
from agent_framework import AgentResponse
# 3) Run with a single prompt and print the aggregated agent responses
events = await workflow.run("We are launching a new budget-friendly electric bike for urban commuters.")
outputs = events.get_outputs()
if outputs:
print("===== Final Aggregated Results =====")
final: AgentResponse = outputs[0]
for msg in final.messages:
name = msg.author_name or "assistant"
print(f"{'-' * 60}\n\n[{name}]:\n{msg.text}")
Output di esempio
===== Final Aggregated Results =====
------------------------------------------------------------
[researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
...
------------------------------------------------------------
[marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
...
------------------------------------------------------------
[legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
Avanzato: Esecutori agenti personalizzati
L'orchestrazione simultanea supporta executor personalizzati che incapsulano gli agenti con ulteriore logica. Ciò è utile quando è necessario un maggiore controllo sul modo in cui gli agenti vengono inizializzati e sul modo in cui elaborano le richieste:
Definire executor personalizzati dell'agente
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
Agent,
Executor,
WorkflowContext,
handler,
)
class ResearcherExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "researcher"):
self.agent = chat_client.as_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
def __init__(self, chat_client: FoundryChatClient, id: str = "marketer"):
self.agent = chat_client.as_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
Creare flussi di lavoro utilizzando gestori personalizzati
chat_client = FoundryChatClient(
project_endpoint=os.environ["FOUNDRY_PROJECT_ENDPOINT"],
model=os.environ["FOUNDRY_MODEL"],
credential=AzureCliCredential(),
)
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder(participants=[researcher, marketer, legal]).build()
Avanzato: Aggregatore personalizzato
Per impostazione predefinita, l'orchestrazione simultanea aggrega tutte le risposte dell'agente in un unico AgentResponse con un messaggio di assistente per partecipante. È possibile eseguire l'override di questo comportamento con un aggregatore personalizzato che elabora i risultati in modo specifico:
Definire un aggregatore personalizzato
from agent_framework import AgentExecutorResponse
# Create a summarizer agent for the aggregator
summarizer_agent = chat_client.as_agent(
instructions=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
name="summarizer",
)
# Define a custom aggregator callback
async def summarize_results(results: list[AgentExecutorResponse]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{r.executor_id}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{r.executor_id}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
prompt = "\n\n".join(expert_sections)
response = await summarizer_agent.run(prompt)
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
Creare un flusso di lavoro con l'aggregatore personalizzato
workflow = (
ConcurrentBuilder(participants=[researcher, marketer, legal])
.with_aggregator(summarize_results)
.build()
)
output = None
async for event in workflow.run("We are launching a new budget-friendly electric bike for urban commuters.", stream=True):
if event.type == "output":
output = event.data
if output:
print("===== Final Consolidated Output =====")
print(output)
Esempio di output con aggregatore personalizzato
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
Output intermediari
Per impostazione predefinita, solo l'output dell'aggregatore viene visualizzato come evento del flusso di lavoro output . Impostare intermediate_outputs=True per visualizzare anche l'output individuale di ogni partecipante:
workflow = ConcurrentBuilder(
participants=[researcher, marketer, legal],
intermediate_outputs=True,
).build()
È possibile gestire questi eventi in tempo reale in modalità di streaming:
from agent_framework import AgentResponseUpdate
# Track the last author to format streaming output.
last_author: str | None = None
async for event in workflow.run("Analyze our new product launch strategy.", stream=True):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
update = event.data
author = update.author_name
if author != last_author:
if last_author is not None:
print() # Newline between different authors
print(f"{author}: {update.text}", end="", flush=True)
last_author = author
else:
print(update.text, end="", flush=True)
Concetti chiave
- Esecuzione parallela: tutti gli agenti funzionano contemporaneamente e in modo indipendente sull'attività
-
AgentResponse Output: l'aggregatore predefinito restituisce un singolo
AgentResponsemessaggio di assistente per partecipante (nessun prompt utente incluso) - Prospettive diverse: ogni agente porta la propria esperienza unica allo stesso problema
- Partecipanti Flessibili: È possibile utilizzare direttamente gli agenti o incapsularli in executor personalizzati
- Elaborazione personalizzata: eseguire l'override dell'aggregatore predefinito per sintetizzare i risultati in modi specifici del dominio
-
Output intermedi: impostare
intermediate_outputs=Trueper visualizzare l'output individuale di ogni partecipante, oltre all'output finale dell'aggregatore