このチュートリアルでは、Agent Framework を使用して同時実行ワークフローを作成する方法について説明します。 並列処理を可能にするファンアウト パターンとファンイン パターンを実装し、複数の Executor またはエージェントが同時に動作し、その結果を集計できるようにする方法について説明します。
あなたが構築するもの
次のワークフローを作成します。
- 質問を入力として受け取ります (例: "温度とは?)
- 同じ質問を 2 人の専門家 AI エージェントに同時に送信します (物理学者と化学者)
- 両方のエージェントからの応答を収集して 1 つの出力に結合します
- AIエージェントをファンアウト/ファンインパターンで同時実行する方法を実証します。
対象となる概念
[前提条件]
- .NET 8.0 SDK 以降
- Azure OpenAI サービスのエンドポイントとデプロイメントの構成
- Azure CLI のインストール と 認証 (Azure 資格情報認証の場合)
- 新しいコンソール アプリケーション
手順 1: NuGet パッケージをインストールする
まず、.NET プロジェクトに必要なパッケージをインストールします。
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
手順 2: 依存関係と Azure OpenAI を設定する
まず、必要な NuGet パッケージと Azure OpenAI クライアントを使用してプロジェクトを設定します。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
手順 3: エキスパート AI エージェントを作成する
専門家の視点を提供する 2 つの特殊な AI エージェントを作成します。
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
手順 4: Start Executor を作成する
複数のエージェントに入力を送信して同時実行処理を開始する Executor を作成します。
var startExecutor = new ConcurrentStartExecutor();
ConcurrentStartExecutor実装:
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
手順 5: 集計実行プログラムを作成する
複数のエージェントからの応答を収集して結合する Executor を作成します。
var aggregationExecutor = new ConcurrentAggregationExecutor();
ConcurrentAggregationExecutor実装:
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
手順 6: ワークフローを構築する
ファンアウトおよびファンイン エッジ パターンを使用して Executor とエージェントを接続します。
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
手順 7: ワークフローを実行する
ワークフローを実行し、ストリーミング出力をキャプチャします。
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
しくみ
-
ファンアウト:
ConcurrentStartExecutorは入力の質問を受け取り、ファンアウトエッジは物理学者と化学者の両方のエージェントに同時に送信します。 - 並列処理: 両方の AI エージェントが同じ質問を同時に処理し、それぞれが専門家の視点を提供します。
-
ファンイン:
ConcurrentAggregationExecutorは、両方のエージェントからChatMessage応答を収集します。 - 集計: 両方の応答を受信すると、アグリゲーターはそれらを書式設定された出力に結合します。
主要概念
-
Fan-Out エッジ:
AddFanOutEdge()を使用して、同じ入力を複数の Executor またはエージェントに配布します。 -
Fan-In エッジ:
AddFanInEdge()を使用して、複数のソースエクスキュータから結果を収集します。 - AI エージェント統合: AI エージェントは、ワークフローの Executor として直接使用できます。
-
Executor 基本クラス: カスタム Executor は
Executor<TInput>から継承し、HandleAsyncメソッドをオーバーライドします。 -
ターン トークン:
TurnTokenを使用してエージェントに通知し、キューに登録されたメッセージの処理を開始します。 -
ストリーミング実行:
StreamAsync()を使用して、ワークフローの進行に合ったリアルタイムの更新を取得します。
完全な実装
AI エージェントを使用したこの同時実行ワークフローの完全な実装については、Agent Framework リポジトリの Concurrent/Program.cs サンプルを参照してください。
Python 実装では、複数の並列実行プログラムを介してデータを処理し、さまざまな型の結果を集計する同時実行ワークフローを構築します。 この例では、フレームワークが同時処理からの混合結果の種類を処理する方法を示します。
あなたが構築するもの
次のワークフローを作成します。
- 数値の一覧を入力として受け取ります
- 2 つの並列実行プログラム (1 つは平均を計算し、1 つは合計を計算) にリストを分散します。
- さまざまな結果の種類 (float と int) を最終的な出力に集計します
- 同時実行実行プログラムとは異なる結果の種類をフレームワークが処理する方法を示します
対象となる概念
[前提条件]
- Python 3.10 以降
- Agent Framework Core がインストールされている:
pip install agent-framework-core --pre
手順 1: 必要な依存関係をインポートする
まず、Agent Framework から必要なコンポーネントをインポートします。
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
手順 2: ディスパッチャー Executor を作成する
ディスパッチャーは、最初の入力を複数の並列実行プログラムに分散する役割を担います。
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
手順 3: 並列処理実行プログラムを作成する
データを同時に処理する 2 つの Executor を作成します。
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
手順 4: アグリゲーター Executor を作成する
アグリゲーターは並列実行プログラムから結果を収集し、最終的な出力を生成します。
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
手順 5: ワークフローを構築する
ファンアウトおよびファンイン エッジ パターンを使用して Executor を接続します。
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
手順 6: ワークフローを実行する
サンプル データを使用してワークフローを実行し、出力をキャプチャします。
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
しくみ
-
ファンアウト:
Dispatcherは入力リストを受け取り、AverageとSumの両方の Executor に同時に送信します -
並列処理: 両方の実行プログラムが同じ入力を同時に処理し、異なる結果の種類を生成します。
-
Averageexecutor がfloat結果を生成する -
Sumexecutor がint結果を生成する
-
-
ファンイン:
Aggregatorは両方の Executor から両方の型を含むリストとして結果を受け取ります -
型処理: フレームワークは、共用体型 (
int | float) を使用してさまざまな結果の種類を自動的に処理します
主要概念
-
Fan-Out エッジ:
add_fan_out_edges()を使用して同じ入力を複数の Executor に送信する -
Fan-In エッジ:
add_fan_in_edges()を使用して複数のソース実行エンジンから結果を収集する -
ユニオン型: 型注釈を使用して、さまざまな結果の型を処理します
list[int | float] - 同時実行: 複数の Executor がデータを同時に処理し、パフォーマンスを向上させる
完全な実装
この同時実行ワークフローの完全な作業実装については、Agent Framework リポジトリの aggregate_results_of_different_types.py サンプルを参照してください。