Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве показано, как создать параллельный рабочий процесс с помощью Agent Framework. Вы узнаете, как реализовать шаблоны расщепления и объединения, которые обеспечивают параллельную обработку, позволяя нескольким исполнителям или агентам работать одновременно с последующим объединением их результатов.
Что вы будете создавать
Вы создадите рабочий процесс, который:
- Принимает вопрос в качестве входных данных (например, "Что такое температура?")
- Отправляет тот же самый вопрос двум агентам ИИ одновременно (физику и химику)
- Собирает и объединяет ответы обоих агентов в один выход
- Демонстрирует параллельное выполнение с агентами ИИ, используя шаблоны фан-аут/фан-ин.
Основные понятия, описанные в статье
Предпосылки
- Пакет SDK для .NET 8.0 или более поздней версии
- Конечная точка и развертывание службы Azure OpenAI настроены
- Установленный и прошедший проверку подлинности Azure CLI (для проверки подлинности учетных данных Azure)
- Новое консольное приложение
Шаг 1. Установка пакетов NuGet
Сначала установите необходимые пакеты для проекта .NET:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Шаг 2. Настройка зависимостей и Azure OpenAI
Начните с настройки проекта с необходимыми пакетами NuGet и клиентом Azure OpenAI:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Шаг 3. Создание агентов экспертного ИИ
Создайте два специализированных агента ИИ, которые будут предоставлять экспертные перспективы:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Шаг 4. Создание начального исполнителя
Создайте исполнителя, который инициирует параллельную обработку, отправив входные данные нескольким агентам:
var startExecutor = new ConcurrentStartExecutor();
Реализация ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Шаг 5. Создание исполнителя агрегирования
Создайте исполнителя, который собирает и объединяет ответы из нескольких агентов:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Реализация ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Шаг 6. Создание рабочего процесса
Подключите исполнителей и агентов с помощью схем разветвления и объединения.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Шаг 7. Выполнение рабочего процесса
Запустите рабочий процесс и захватить выходные данные потоковой передачи:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Принцип работы
-
Fan-Out:
ConcurrentStartExecutorпринимает входной вопрос, и исходящий канал отправляет его как физикам, так и химикам одновременно. - Параллельная обработка: оба агента ИИ одновременно обрабатывают один и тот же вопрос, каждый из которых предоставляет свою экспертную перспективу.
-
Fan-In:
ConcurrentAggregationExecutorсобираетChatMessageответы от обоих агентов. - Агрегирование: после получения обоих ответов агрегатор объединяет их в форматированные выходные данные.
Основные понятия
-
Fan-Out edges: используйте
AddFanOutEdge()для распределения одних и того же входных данных нескольким исполнителям или агентам. -
Fan-In edges: используется
AddFanInEdge()для сбора результатов из нескольких исходных исполнителей. - Интеграция агента ИИ: агенты ИИ могут использоваться непосредственно в качестве исполнителей в рабочих процессах.
-
Базовый класс Executor: пользовательские исполнители наследуются от
Executor<TInput>и переопределяют методHandleAsync. -
Активация токенов: Используйте
TurnTokenдля сигнала агентам, чтобы начать обработку сообщений в очереди. -
Выполнение потоковой передачи: используется
StreamAsync()для получения обновлений в режиме реального времени при выполнении рабочего процесса.
Полная реализация
Полную рабочую реализацию этого параллельного процесса с агентами ИИ смотрите в примере Concurrent/Program.cs в репозитории Agent Framework.
В реализации Python вы создадите параллельный рабочий процесс, обрабатывающий данные с помощью нескольких параллельных исполнителей и агрегирует результаты различных типов. В этом примере показано, как платформа обрабатывает смешанные типы результатов из параллельной обработки.
Что вы будете создавать
Вы создадите рабочий процесс, который:
- Принимает список чисел в качестве входных данных
- Распределяет список двум параллельным исполнителям (один вычисляющий средний, один вычисляющий сумму)
- Агрегирует различные типы результатов (float и int) в окончательные выходные данные
- Показывает, как платформа обрабатывает различные типы результатов от одновременных исполнителей
Основные понятия, описанные в статье
Предпосылки
- Python 3.10 или более поздней версии
- Установлен агент Framework Core:
pip install agent-framework-core --pre
Шаг 1. Импорт требуемых зависимостей
Начните с импорта необходимых компонентов из Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Шаг 2. Создание исполнителя диспетчера
Диспетчер отвечает за распределение исходных входных данных нескольким параллельным исполнителям:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Шаг 3. Создание исполнителей параллельной обработки
Создайте двух исполнителей, которые будут обрабатывать данные одновременно:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Шаг 4. Создание исполнителя агрегатора
Агрегатор собирает результаты от параллельных исполнителей и выдает окончательные выходные данные:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Шаг 5. Создание рабочего процесса
Подключите исполнителей с помощью шаблонов расширения и сужения:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Шаг 6. Запуск рабочего процесса
Выполните поток выполнения с примерами данных и захватите выходные данные.
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Принцип работы
-
Fan-Out:
Dispatcherполучает входной список и одновременно отправляет его исполнителямAverageиSum. -
Параллельная обработка: оба исполнителя одновременно обрабатывают одни и те же входные данные, создавая различные типы результатов:
-
Averageисполнитель выдаетfloatрезультат -
SumИсполнитель выдаетintрезультат
-
-
Fan-In:
Aggregatorпринимает результаты от обоих исполнителей в виде списка, содержащего оба типа данных. -
Обработка типов: платформа автоматически обрабатывает различные типы результатов с помощью типов объединения (
int | float)
Основные понятия
-
Fan-Out edges: используется
add_fan_out_edges()для отправки одного и того же входного данных нескольким исполнителям -
Fan-In edges: используйте
add_fan_in_edges()для сбора результатов от нескольких исходных обработчиков -
Типы объединения: обработка различных типов результатов с помощью аннотаций типа, таких как
list[int | float] - Параллельное выполнение. Одновременное выполнение нескольких исполнителей обрабатывает данные, повышая производительность
Полная реализация
Для полной реализации этого параллельного рабочего процесса см. пример aggregate_results_of_different_types.py в репозитории Agent Framework.