共用方式為


建立簡單的並行工作流程

本教學課程示範如何使用代理程式架構建立並行工作流程。 您將學習實作扇出和扇入模式,以啟用平行處理,允許多個執行器或代理程式同時工作,然後彙總其結果。

您將構建什麼

您將建立工作流程,以:

  • 將問題作為輸入 (例如,「什麼是溫度?」
  • 同時將相同的問題傳送給兩個專家 AI 代理(物理學家和化學家)
  • 收集來自兩個代理的回應並將其組合成單一輸出
  • 展示使用扇出/扇入模式與 AI 代理並發執行

涵蓋概念

先決條件

步驟 1:安裝 NuGet 套件

首先,安裝 .NET 專案所需的套件:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

步驟 2:設定相依性和 Azure OpenAI

首先,使用必要的 NuGet 套件和 Azure OpenAI 用戶端來設定您的專案:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

第 3 步:創建專家 AI 代理

建立兩個專門的 AI 代理程式,提供專家觀點:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

第 4 步:建立啟動執行器

建立執行程式,透過將輸入傳送至多個代理程式來起始並行處理:

        var startExecutor = new ConcurrentStartExecutor();

實作:ConcurrentStartExecutor

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

步驟 5:建立彙總執行程式

建立執行程式,以收集並合併來自多個代理程式的回應:

        var aggregationExecutor = new ConcurrentAggregationExecutor();

實作:ConcurrentAggregationExecutor

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

步驟 6:建立工作流程

使用扇出和扇入邊緣模式連接執行程式和代理程式:

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

步驟 7:執行工作流程

執行工作流程並擷取串流輸出:

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

運作方式

  1. 扇出ConcurrentStartExecutor 接收輸入問題,扇出邊同時將其發送給物理學家和化學家代理。
  2. 並行處理:兩個人工智慧代理同時處理相同的問題,每個代理都提供自己的專家觀點。
  3. 扇入:收集ConcurrentAggregationExecutorChatMessage來自兩個代理的回應。
  4. 聚合:收到兩個回應後,聚合器將它們組合成格式化的輸出。

重要概念

  • Fan-Out Edge:用於 AddFanOutEdge() 將相同的輸入分發給多個執行器或代理程式。
  • Fan-In Edge:用於 AddFanInEdge() 從多個來源執行程式收集結果。
  • AI 代理集成: AI 代理可以直接用作工作流程中的執行器。
  • Executor 基類:自訂執行程式繼承自Executor<TInput>類別並覆寫HandleAsync方法。
  • 輪次權杖:用 TurnToken 用來向代理程式發出信號,開始處理佇列的訊息。
  • 串流執行:使用 StreamAsync() 在工作流程進展中取得即時更新。

完成實施

如需此 AI 代理程式並行工作流程的完整工作實作,請參閱代理程式架構存放庫中的 並行/Program.cs 範例。

在 Python 實作中,您將建置並行工作流程,透過多個平行執行程式處理資料,並彙總不同類型的結果。 此範例示範架構如何處理並行處理的混合結果類型。

您將構建什麼

您將建立工作流程,以:

  • 將數字清單作為輸入
  • 將清單配送至兩個平行執行程式 (一個計算平均值,一個計算總和)
  • 將不同的結果類型 (float 和 int) 彙總成最終輸出
  • 示範架構如何處理來自並行執行程式的不同結果類型

涵蓋概念

先決條件

  • Python 3.10 或更新版本
  • 已安裝的代理程式架構核心: pip install agent-framework-core --pre

步驟 1:匯入所需的依賴項

首先從代理程式框架匯入必要的元件:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

步驟2:建立Dispatcher執行程式

分派器負責將初始輸入分發給多個平行執行器:

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

步驟 3:建立平行處理執行程式

建立兩個執行程式,以同時處理資料:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

步驟 4:建立聚合器執行器

聚合器從平行執行器收集結果並產生最終輸出:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

步驟 5:建立工作流程

使用扇出和扇入邊緣模式連接執行器:

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

步驟 6:執行工作流程

使用範例資料執行工作流程並擷取輸出:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

運作方式

  1. 扇出Dispatcher接收輸入清單,並同時傳送給Average執行器和Sum執行器
  2. 平行處理:兩個執行器同時處理相同的輸入,產生不同的結果類型:
    • Average執行器產生結果float
    • Sum執行器產生結果int
  3. Fan-InAggregator從兩個執行器接收結果,並合併為一個包含這兩種結果類型的清單。
  4. 類型處理:框架使用聯合類型 (int | float) 自動處理不同的結果類型

重要概念

  • 扇出邊:使用 add_fan_out_edges() 將相同的輸入傳送至多個執行單元
  • Fan-In Edge:用於 add_fan_in_edges() 從多個來源執行程式收集結果
  • 聯合類型:使用類型註釋處理不同的結果類型,例如 list[int | float]
  • 行執行:多個執行器同時處理數據,提高效能

完成實施

如需此並行工作流程的完整工作實作,請參閱代理程式架構存放庫中的 aggregate_results_of_different_types.py 範例。

後續步驟