群组聊天编排对多个代理之间的协作对话进行建模,由协调程序确定说话者选择并协调聊天流。 此模式非常适合需要迭代优化、协作解决问题或多透视分析的方案。
在内部,群组聊天编排将代理组合为星形拓扑结构,中间有一个协调程序。 协调器可以实施各种策略来选择下一个代理,例如轮循机制、基于提示的选择或基于聊天上下文的自定义逻辑,使其成为多代理协作的灵活且强大的模式。
群聊和其他模式之间的差异
与其他多代理模式相比,群聊业务流程具有不同的特征:
- 集中协调:与代理直接转移控制权的交接模式不同,群聊使用协调器来决定下一个发言者
- 迭代优化:代理可以在多个轮中查看并互相借鉴彼此的反馈进行改进
- 灵活的说话人选择:协调器可以使用各种策略(轮循机制、基于提示、自定义逻辑)来选择说话人。
- 共享上下文:所有代理都会看到完整的对话历史记录,从而实现协作优化
学习内容
- 如何为组协作创建专用代理
- 如何配置说话人选择策略
- 如何使用迭代代理优化生成工作流
- 如何使用自定义业务流程协调程序自定义聊天流
设置 Azure OpenAI 客户端
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
using Microsoft.Agents.AI;
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ??
throw new InvalidOperationException("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var client = new AzureOpenAIClient(new Uri(endpoint), new DefaultAzureCredential())
.GetChatClient(deploymentName)
.AsIChatClient();
警告
DefaultAzureCredential 对于开发来说很方便,但在生产中需要仔细考虑。 在生产环境中,请考虑使用特定凭据(例如), ManagedIdentityCredential以避免延迟问题、意外凭据探测以及回退机制的潜在安全风险。
定义代理
在组会话中为不同角色创建专用代理:
// Create a copywriter agent
ChatClientAgent writer = new(client,
"You are a creative copywriter. Generate catchy slogans and marketing copy. Be concise and impactful.",
"CopyWriter",
"A creative copywriter agent");
// Create a reviewer agent
ChatClientAgent reviewer = new(client,
"You are a marketing reviewer. Evaluate slogans for clarity, impact, and brand alignment. " +
"Provide constructive feedback or approval.",
"Reviewer",
"A marketing review agent");
使用 Round-Robin Orchestrator 配置群组聊天
使用 AgentWorkflowBuilder以下命令生成群组聊天工作流:
// Build group chat with round-robin speaker selection
// The manager factory receives the list of agents and returns a configured manager
var workflow = AgentWorkflowBuilder
.CreateGroupChatBuilderWith(agents =>
new RoundRobinGroupChatManager(agents)
{
MaximumIterationCount = 5 // Maximum number of turns
})
.AddParticipants(writer, reviewer)
.Build();
运行群组聊天工作流
执行工作流并观察迭代对话:
// Start the group chat
var messages = new List<ChatMessage> {
new(ChatRole.User, "Create a slogan for an eco-friendly electric vehicle.")
};
StreamingRun run = await InProcessExecution.StreamAsync(workflow, messages);
await run.TrySendMessageAsync(new TurnToken(emitEvents: true));
await foreach (WorkflowEvent evt in run.WatchStreamAsync().ConfigureAwait(false))
{
if (evt is AgentResponseUpdateEvent update)
{
// Process streaming agent responses
AgentResponse response = update.AsResponse();
foreach (ChatMessage message in response.Messages)
{
Console.WriteLine($"[{update.ExecutorId}]: {message.Text}");
}
}
else if (evt is WorkflowOutputEvent output)
{
// Workflow completed
var conversationHistory = output.As<List<ChatMessage>>();
Console.WriteLine("\n=== Final Conversation ===");
foreach (var message in conversationHistory)
{
Console.WriteLine($"{message.AuthorName}: {message.Text}");
}
break;
}
}
示例交互
[CopyWriter]: "Green Dreams, Zero Emissions" - Drive the future with style and sustainability.
[Reviewer]: The slogan is good, but "Green Dreams" might be a bit abstract. Consider something
more direct like "Pure Power, Zero Impact" to emphasize both performance and environmental benefit.
[CopyWriter]: "Pure Power, Zero Impact" - Experience electric excellence without compromise.
[Reviewer]: Excellent! This slogan is clear, impactful, and directly communicates the key benefits.
The tagline reinforces the message perfectly. Approved for use.
[CopyWriter]: Thank you! The final slogan is: "Pure Power, Zero Impact" - Experience electric
excellence without compromise.
设置聊天客户端
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
# Initialize the Azure OpenAI chat client
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
定义代理
创建具有不同角色的专用代理:
from agent_framework import Agent
# Create a researcher agent
researcher = Agent(
name="Researcher",
description="Collects relevant background information.",
instructions="Gather concise facts that help answer the question. Be brief and factual.",
chat_client=chat_client,
)
# Create a writer agent
writer = Agent(
name="Writer",
description="Synthesizes polished answers using gathered information.",
instructions="Compose clear, structured answers using any notes provided. Be comprehensive.",
chat_client=chat_client,
)
使用简单选择器配置群组聊天
使用自定义说话人选择逻辑生成群组聊天:
from agent_framework.orchestrations import GroupChatBuilder, GroupChatState
def round_robin_selector(state: GroupChatState) -> str:
"""A round-robin selector function that picks the next speaker based on the current round index."""
participant_names = list(state.participants.keys())
return participant_names[state.current_round % len(participant_names)]
# Build the group chat workflow
workflow = GroupChatBuilder(
participants=[researcher, writer],
termination_condition=lambda conversation: len(conversation) >= 4,
selection_func=round_robin_selector,
).build()
使用 Agent-Based Orchestrator 配置群组聊天
或者,使用基于代理的编排器进行智能音频选择。 协调器是一个完整的Agent,可访问工具、上下文和监控能力。
# Create orchestrator agent for speaker selection
orchestrator_agent = Agent(
name="Orchestrator",
description="Coordinates multi-agent collaboration by selecting speakers",
instructions="""
You coordinate a team conversation to solve the user's task.
Guidelines:
- Start with Researcher to gather information
- Then have Writer synthesize the final answer
- Only finish after both have contributed meaningfully
""",
chat_client=chat_client,
)
# Build group chat with agent-based orchestrator
workflow = GroupChatBuilder(
participants=[researcher, writer],
# Set a hard termination condition: stop after 4 assistant messages
# The agent orchestrator will intelligently decide when to end before this limit but just in case
termination_condition=lambda messages: sum(1 for msg in messages if msg.role == "assistant") >= 4,
orchestrator_agent=orchestrator_agent,
).build()
运行群组聊天工作流
执行工作流和处理事件:
from typing import cast
from agent_framework import AgentResponseUpdate, Role
task = "What are the key benefits of async/await in Python?"
print(f"Task: {task}\n")
print("=" * 80)
final_conversation: list[Message] = []
last_executor_id: str | None = None
# Run the workflow
async for event in workflow.run_stream(task):
if event.type == "output" and isinstance(event.data, AgentResponseUpdate):
# Print streaming agent updates
eid = event.executor_id
if eid != last_executor_id:
if last_executor_id is not None:
print()
print(f"[{eid}]:", end=" ", flush=True)
last_executor_id = eid
print(event.data, end="", flush=True)
elif event.type == "output":
# Workflow completed - data is a list of Message
final_conversation = cast(list[Message], event.data)
if final_conversation:
print("\n\n" + "=" * 80)
print("Final Conversation:")
for msg in final_conversation:
author = getattr(msg, "author_name", "Unknown")
text = getattr(msg, "text", str(msg))
print(f"\n[{author}]\n{text}")
print("-" * 80)
print("\nWorkflow completed.")
示例交互
Task: What are the key benefits of async/await in Python?
================================================================================
[Researcher]: Async/await in Python provides non-blocking I/O operations, enabling
concurrent execution without threading overhead. Key benefits include improved
performance for I/O-bound tasks, better resource utilization, and simplified
concurrent code structure using native coroutines.
[Writer]: The key benefits of async/await in Python are:
1. **Non-blocking Operations**: Allows I/O operations to run concurrently without
blocking the main thread, significantly improving performance for network
requests, file I/O, and database queries.
2. **Resource Efficiency**: Avoids the overhead of thread creation and context
switching, making it more memory-efficient than traditional threading.
3. **Simplified Concurrency**: Provides a clean, synchronous-looking syntax for
asynchronous code, making concurrent programs easier to write and maintain.
4. **Scalability**: Enables handling thousands of concurrent connections with
minimal resource consumption, ideal for high-performance web servers and APIs.
--------------------------------------------------------------------------------
Workflow completed.
关键概念
- 集中管理器:群组聊天使用经理协调说话人选择和流
- AgentWorkflowBuilder.CreateGroupChatBuilderWith():使用管理器工厂函数创建工作流
- RoundRobinGroupChatManager:在轮流机制中轮换发言者的内置管理器
- MaximumIterationCount:控制终止前代理的最大轮次数
-
自定义管理器:扩展
RoundRobinGroupChatManager或实现自定义逻辑 - 迭代优化:代理评审和改进彼此的贡献
- 共享上下文:所有参与者都看到完整的对话历史记录
-
灵活的业务流程协调程序策略:通过构造函数参数(
selection_func或orchestrator_agentorchestrator)在简单的选择器、基于代理的业务流程协调程序或自定义逻辑之间进行选择。 - GroupChatBuilder:创建具有可配置扬声器选择的工作流
- GroupChatState:为选择决策提供聊天状态
- 迭代协作:代理基于彼此的贡献而构建
-
事件流式处理:使用实时数据进行处理
WorkflowOutputEventAgentResponseUpdate - list[Message] 输出:所有业务流程返回聊天消息列表
高级:自定义扬声器选择
可以通过创建自定义群组聊天管理器来实现自定义管理器逻辑:
public class ApprovalBasedManager : RoundRobinGroupChatManager
{
private readonly string _approverName;
public ApprovalBasedManager(IReadOnlyList<AIAgent> agents, string approverName)
: base(agents)
{
_approverName = approverName;
}
// Override to add custom termination logic
protected override ValueTask<bool> ShouldTerminateAsync(
IReadOnlyList<ChatMessage> history,
CancellationToken cancellationToken = default)
{
var last = history.LastOrDefault();
bool shouldTerminate = last?.AuthorName == _approverName &&
last.Text?.Contains("approve", StringComparison.OrdinalIgnoreCase) == true;
return ValueTask.FromResult(shouldTerminate);
}
}
// Use custom manager in workflow
var workflow = AgentWorkflowBuilder
.CreateGroupChatBuilderWith(agents =>
new ApprovalBasedManager(agents, "Reviewer")
{
MaximumIterationCount = 10
})
.AddParticipants(writer, reviewer)
.Build();
可以根据聊天状态实现复杂的选择逻辑:
def smart_selector(state: GroupChatState) -> str:
"""Select speakers based on conversation content and context."""
conversation = state.conversation
last_message = conversation[-1] if conversation else None
# If no messages yet, start with Researcher
if not last_message:
return "Researcher"
# Check last message content
last_text = last_message.text.lower()
# If researcher finished gathering info, switch to writer
if "I have finished" in last_text and last_message.author_name == "Researcher":
return "Writer"
# Else continue with researcher until it indicates completion
return "Researcher"
workflow = GroupChatBuilder(
participants=[researcher, writer],
selection_func=smart_selector,
).build()
重要
对高级方案使用自定义实现BaseGroupChatOrchestrator时,必须设置所有属性,包括participant_registry和max_roundstermination_condition。
max_rounds 和 termination_condition 在生成器中的设置将被忽略。
上下文同步
如本指南开头所述,群组聊天中的所有代理都会看到完整的对话历史记录。
Agent Framework 中的代理依赖于代理会话(AgentSession)来管理上下文。 在群组聊天业务流程中,代理 不会 共享相同的会话实例,但业务流程协调程序可确保每个代理的会话在每次轮次之前都与完整的聊天历史记录同步。 为实现这一目标,在每个代理的回合之后,协调器将响应广播给所有其他代理,确保所有参与者在下一回合中拥有最新的上下文信息。
小窍门
代理不共享同一会话实例,因为不同的 代理类型 可能具有不同的抽象实现 AgentSession 。 共享同一会话实例可能会导致每个代理处理和维护上下文的方式不一致。
广播响应后,业务流程协调程序随后决定下一个说话人,并向所选代理发送请求,该代理现在具有完整的对话历史记录来生成其响应。
何时使用群组聊天
群组聊天管理很适合:
- 迭代优化:多轮评审和改进
- 协作问题解决:具有互补专业知识的代理协同工作
- 内容创建:用于创建文档的编写者-审阅者工作流
- 多角度分析:从相同输入获取不同视角的观点
- 质量保证:自动评审和审批流程
在以下情况下考虑替代方法:
- 您需要严格的顺序处理(使用顺序编排)
- 代理应完全独立工作(使用并发协调)
- 需要代理间的直接切换(使用 Handoff 编排)
- 需要复杂的动态规划(使用 Magentic 编排)