동시 오케스트레이션을 사용하면 여러 에이전트가 동일한 작업에서 병렬로 작업할 수 있습니다. 각 에이전트는 입력을 독립적으로 처리하고 결과를 수집 및 집계합니다. 이 접근 방식은 브레인스토밍, 앙상블 추론 또는 투표 시스템과 같은 다양한 관점이나 솔루션이 중요한 시나리오에 적합합니다.
학습 내용
- 다양한 전문 지식을 갖춘 여러 에이전트를 정의하는 방법
- 단일 작업에서 동시에 작동하도록 이러한 에이전트를 오케스트레이션하는 방법
- 결과를 수집하고 처리하는 방법
동시 오케스트레이션에서 여러 에이전트는 동일한 작업을 동시에 독립적으로 작업하여 동일한 입력에 대한 다양한 관점을 제공합니다.
Azure OpenAI 클라이언트 설정
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// 1) Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
에이전트 설정
동일한 작업을 동시에 수행할 여러 특수 에이전트를 만듭니다.
// 2) Helper method to create translation agents
static ChatClientAgent GetTranslationAgent(string targetLanguage, IChatClient chatClient) =>
new(chatClient,
$"You are a translation assistant who only responds in {targetLanguage}. Respond to any " +
$"input by outputting the name of the input language and then translating the input to {targetLanguage}.");
// Create translation agents for concurrent processing
var translationAgents = (from lang in (string[])["French", "Spanish", "English"]
select GetTranslationAgent(lang, client));
동시 오케스트레이션 설정
에이전트를 AgentWorkflowBuilder 병렬로 실행하는 워크플로를 빌드합니다.
// 3) Build concurrent workflow
var workflow = AgentWorkflowBuilder.BuildConcurrent(translationAgents);
동시 워크플로 실행 및 결과 수집
워크플로를 실행하고 동시에 실행되는 모든 에이전트에서 이벤트를 처리합니다.
// 4) Run the workflow
var messages = new List<ChatMessage> { new(ChatRole.User, "Hello, world!") };
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
List<ChatMessage> result = new();
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentRunUpdateEvent e)
{
Console.WriteLine($"{e.ExecutorId}: {e.Data}");
}
else if (evt is WorkflowOutputEvent outputEvt)
{
result = (List<ChatMessage>)outputEvt.Data!;
break;
}
}
// Display aggregated results from all agents
Console.WriteLine("===== Final Aggregated Results =====");
foreach (var message in result)
{
Console.WriteLine($"{message.Role}: {message.Content}");
}
샘플 출력
French_Agent: English detected. Bonjour, le monde !
Spanish_Agent: English detected. ¡Hola, mundo!
English_Agent: English detected. Hello, world!
===== Final Aggregated Results =====
User: Hello, world!
Assistant: English detected. Bonjour, le monde !
Assistant: English detected. ¡Hola, mundo!
Assistant: English detected. Hello, world!
주요 개념
- 병렬 실행: 모든 에이전트는 입력을 동시에 독립적으로 처리합니다.
- AgentWorkflowBuilder.BuildConcurrent(): 에이전트 컬렉션에서 동시 워크플로를 만듭니다.
- 자동 집계: 모든 에이전트의 결과가 최종 결과로 자동으로 수집됩니다.
-
이벤트 스트리밍: 에이전트 진행률에 대한 실시간 모니터링
AgentRunUpdateEvent - 다양한 관점: 각 에이전트는 동일한 문제에 고유한 전문 지식을 제공합니다.
에이전트는 작업을 처리할 수 있는 특수 엔터티입니다. 여기서는 연구 전문가, 마케팅 전문가 및 법률 전문가의 세 가지 에이전트를 정의합니다.
from agent_framework.azure import AzureChatClient
# 1) Create three domain agents using AzureChatClient
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name="researcher",
)
marketer = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name="marketer",
)
legal = chat_client.create_agent(
instructions=(
"You're a cautious legal/compliance reviewer. Highlight constraints, disclaimers, and policy concerns"
" based on the prompt."
),
name="legal",
)
동시 오케스트레이션 설정
클래스 ConcurrentBuilder 를 사용하면 여러 에이전트를 병렬로 실행하는 워크플로를 생성할 수 있습니다. 에이전트 목록을 참가자로 전달합니다.
from agent_framework import ConcurrentBuilder
# 2) Build a concurrent workflow
# Participants are either Agents (type of AgentProtocol) or Executors
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
동시 워크플로 실행 및 결과 수집
from agent_framework import ChatMessage, WorkflowOutputEvent
# 3) Run with a single prompt, stream progress, and pretty-print the final combined messages
output_evt: WorkflowOutputEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowOutputEvent):
output_evt = event
if output_evt:
print("===== Final Aggregated Conversation (messages) =====")
messages: list[ChatMessage] | Any = output_evt.data
for i, msg in enumerate(messages, start=1):
name = msg.author_name if msg.author_name else "user"
print(f"{'-' * 60}\n\n{i:02d} [{name}]:\n{msg.text}")
샘플 출력
Sample Output:
===== Final Aggregated Conversation (messages) =====
------------------------------------------------------------
01 [user]:
We are launching a new budget-friendly electric bike for urban commuters.
------------------------------------------------------------
02 [researcher]:
**Insights:**
- **Target Demographic:** Urban commuters seeking affordable, eco-friendly transport;
likely to include students, young professionals, and price-sensitive urban residents.
- **Market Trends:** E-bike sales are growing globally, with increasing urbanization,
higher fuel costs, and sustainability concerns driving adoption.
- **Competitive Landscape:** Key competitors include brands like Rad Power Bikes, Aventon,
Lectric, and domestic budget-focused manufacturers in North America, Europe, and Asia.
- **Feature Expectations:** Customers expect reliability, ease-of-use, theft protection,
lightweight design, sufficient battery range for daily city commutes (typically 25-40 miles),
and low-maintenance components.
**Opportunities:**
- **First-time Buyers:** Capture newcomers to e-biking by emphasizing affordability, ease of
operation, and cost savings vs. public transit/car ownership.
...
------------------------------------------------------------
03 [marketer]:
**Value Proposition:**
"Empowering your city commute: Our new electric bike combines affordability, reliability, and
sustainable design—helping you conquer urban journeys without breaking the bank."
**Target Messaging:**
*For Young Professionals:*
...
------------------------------------------------------------
04 [legal]:
**Constraints, Disclaimers, & Policy Concerns for Launching a Budget-Friendly Electric Bike for Urban Commuters:**
**1. Regulatory Compliance**
- Verify that the electric bike meets all applicable federal, state, and local regulations
regarding e-bike classification, speed limits, power output, and safety features.
- Ensure necessary certifications (e.g., UL certification for batteries, CE markings if sold internationally) are obtained.
**2. Product Safety**
- Include consumer safety warnings regarding use, battery handling, charging protocols, and age restrictions.
고급: 사용자 지정 에이전트 실행기
동시 오케스트레이션은 에이전트를 추가 논리로 래핑하는 사용자 지정 실행기를 지원합니다. 이는 에이전트가 초기화되는 방법과 에이전트가 요청을 처리하는 방법을 더 자세히 제어해야 하는 경우에 유용합니다.
사용자 지정 에이전트 실행기 정의
from agent_framework import (
AgentExecutorRequest,
AgentExecutorResponse,
ChatAgent,
Executor,
WorkflowContext,
handler,
)
class ResearcherExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "researcher"):
agent = chat_client.create_agent(
instructions=(
"You're an expert market and product researcher. Given a prompt, provide concise, factual insights,"
" opportunities, and risks."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
class MarketerExec(Executor):
agent: ChatAgent
def __init__(self, chat_client: AzureChatClient, id: str = "marketer"):
agent = chat_client.create_agent(
instructions=(
"You're a creative marketing strategist. Craft compelling value propositions and target messaging"
" aligned to the prompt."
),
name=id,
)
super().__init__(agent=agent, id=id)
@handler
async def run(self, request: AgentExecutorRequest, ctx: WorkflowContext[AgentExecutorResponse]) -> None:
response = await self.agent.run(request.messages)
full_conversation = list(request.messages) + list(response.messages)
await ctx.send_message(AgentExecutorResponse(self.id, response, full_conversation=full_conversation))
사용자 지정 실행기를 사용하여 워크플로 빌드
chat_client = AzureChatClient(credential=AzureCliCredential())
researcher = ResearcherExec(chat_client)
marketer = MarketerExec(chat_client)
legal = LegalExec(chat_client)
workflow = ConcurrentBuilder().participants([researcher, marketer, legal]).build()
고급: 사용자 지정 집계
기본적으로 동시 오케스트레이션은 모든 에이전트 응답을 메시지 목록으로 집계합니다. 특정 방식으로 결과를 처리하는 사용자 지정 집계를 사용하여 이 동작을 재정의할 수 있습니다.
사용자 지정 집계 정의
# Define a custom aggregator callback that uses the chat client to summarize
async def summarize_results(results: list[Any]) -> str:
# Extract one final assistant message per agent
expert_sections: list[str] = []
for r in results:
try:
messages = getattr(r.agent_run_response, "messages", [])
final_text = messages[-1].text if messages and hasattr(messages[-1], "text") else "(no content)"
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}:\n{final_text}")
except Exception as e:
expert_sections.append(f"{getattr(r, 'executor_id', 'expert')}: (error: {type(e).__name__}: {e})")
# Ask the model to synthesize a concise summary of the experts' outputs
system_msg = ChatMessage(
Role.SYSTEM,
text=(
"You are a helpful assistant that consolidates multiple domain expert outputs "
"into one cohesive, concise summary with clear takeaways. Keep it under 200 words."
),
)
user_msg = ChatMessage(Role.USER, text="\n\n".join(expert_sections))
response = await chat_client.get_response([system_msg, user_msg])
# Return the model's final assistant text as the completion result
return response.messages[-1].text if response.messages else ""
사용자 지정 집계를 사용하여 워크플로 빌드
workflow = (
ConcurrentBuilder()
.participants([researcher, marketer, legal])
.with_aggregator(summarize_results)
.build()
)
output_evt: WorkflowOutputEvent | None = None
async for event in workflow.run_stream("We are launching a new budget-friendly electric bike for urban commuters."):
if isinstance(event, WorkflowOutputEvent):
output_evt = event
if output_evt:
print("===== Final Consolidated Output =====")
print(output_evt.data)
사용자 지정 집계를 사용한 샘플 출력
===== Final Consolidated Output =====
Urban e-bike demand is rising rapidly due to eco-awareness, urban congestion, and high fuel costs,
with market growth projected at a ~10% CAGR through 2030. Key customer concerns are affordability,
easy maintenance, convenient charging, compact design, and theft protection. Differentiation opportunities
include integrating smart features (GPS, app connectivity), offering subscription or leasing options, and
developing portable, space-saving designs. Partnering with local governments and bike shops can boost visibility.
Risks include price wars eroding margins, regulatory hurdles, battery quality concerns, and heightened expectations
for after-sales support. Accurate, substantiated product claims and transparent marketing (with range disclaimers)
are essential. All e-bikes must comply with local and federal regulations on speed, wattage, safety certification,
and labeling. Clear warranty, safety instructions (especially regarding batteries), and inclusive, accessible
marketing are required. For connected features, data privacy policies and user consents are mandatory.
Effective messaging should target young professionals, students, eco-conscious commuters, and first-time buyers,
emphasizing affordability, convenience, and sustainability. Slogan suggestion: "Charge Ahead—City Commutes Made
Affordable." Legal review in each target market, compliance vetting, and robust customer support policies are
critical before launch.
주요 개념
- 병렬 실행: 모든 에이전트가 동시에 독립적으로 작업합니다.
- 결과 집계: 결과가 수집되고 기본 또는 사용자 지정 집계를 통해 처리할 수 있습니다.
- 다양한 관점: 각 에이전트는 동일한 문제에 고유한 전문 지식을 제공합니다.
- 유연한 참가자: 에이전트를 직접 사용하거나 사용자 지정 실행기에서 래핑할 수 있습니다.
- 사용자 지정 처리: 도메인별 방식으로 결과를 합성하도록 기본 집계 재정의