Megosztás:


Egyszerű párhuzamos munkafolyamat létrehozása

Ez az oktatóanyag bemutatja, hogyan hozhat létre egyidejű munkafolyamatot az Agent Framework használatával. Megtanulhatja a ki- és bekapcsolási mintákat, amelyek lehetővé teszik a párhuzamos feldolgozást, lehetővé téve több végrehajtó vagy ügynök egyidejű működését, majd az eredmények összesítését.

Mit fog felépíteni?

Létre fog hozni egy munkafolyamatot, amely:

  • Bemenetként vesz fel egy kérdést (például "Mi a hőmérséklet?")
  • Ugyanazt a kérdést küldi két szakértő AI-ügynöknek egyszerre (fizikus és vegyész)
  • Összegyűjti és egyesíti a két ügynök válaszait egyetlen kimenetben
  • Bemutatja az AI-ügynökök párhuzamos végrehajtását fan-out/fan-in minták használatával

A tárgyalt fogalmak

Előfeltételek

1. lépés: NuGet-csomagok telepítése

Először telepítse a szükséges csomagokat a .NET-projekthez:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

2. lépés: A függőségek és az Azure OpenAI beállítása

Először állítsa be a projektet a szükséges NuGet-csomagokkal és az Azure OpenAI-ügyféllel:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

3. lépés: Szakértői AI-ügynökök létrehozása

Hozzon létre két speciális AI-ügynököt, amelyek szakértői perspektívákat biztosítanak:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

4. lépés: A végrehajtó indítása

Hozzon létre egy végrehajtót, amely több ügynöknek küldött bemenettel kezdeményezi az egyidejű feldolgozást:

        var startExecutor = new ConcurrentStartExecutor();

A ConcurrentStartExecutor megvalósítás:

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

5. lépés: Az összesítési végrehajtó létrehozása

Hozzon létre egy végrehajtót, amely több ügynök válaszait gyűjti össze és egyesíti:

        var aggregationExecutor = new ConcurrentAggregationExecutor();

A ConcurrentAggregationExecutor megvalósítás:

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

6. lépés: A munkafolyamat létrehozása

Csatlakoztassa a végrehajtókat és az ügynököket a fan-out és fan-in peremmintákkal.

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

7. lépés: A munkafolyamat végrehajtása

Futtassa a munkafolyamatot, és rögzítse a stream kimenetét:

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

Hogyan működik?

  1. Fan-Out: A ConcurrentStartExecutor fogadja a bemeneti kérdést, és a fan-out él egyszerre küldi el a fizikus és a kémikus ügynököknek.
  2. Párhuzamos feldolgozás: Mindkét AI-ügynök egyszerre dolgozza fel ugyanazt a kérdést, és mindegyik a saját szakértői perspektíváját biztosítja.
  3. Fan-In: A ConcurrentAggregationExecutor két ügynök válaszait gyűjti ChatMessage össze.
  4. Összesítés: Miután mindkét válasz megérkezett, az összesítő formázott kimenetbe egyesíti őket.

Alapfogalmak

  • Fan-Out Edges: Ugyanazt AddFanOutEdge() a bemenetet több végrehajtó vagy ügynök számára is elosztja.
  • Fan-In Edges: Több forrásvégrehajító eredményeinek gyűjtésére használható AddFanInEdge() .
  • AI-ügynökintegráció: Az AI-ügynökök közvetlenül végrehajtóként használhatók a munkafolyamatokban.
  • A végrehajtó alap osztálya: Az egyéni végrehajtók a Executor<TInput> osztályból öröklődnek és felülbírálják a HandleAsync metódust.
  • Tokenek aktiválása: Használja a TurnToken-t, hogy jelezze az ügynököknek a sorban álló üzenetek feldolgozásának megkezdését.
  • Streamelés végrehajtása: Valós idejű frissítések lekérésére használható StreamAsync() a munkafolyamat előrehaladása során.

Implementáció befejezése

Ennek az egyidejű munkafolyamatnak az AI-ügynökökkel való teljes körű működéséhez tekintse meg az Egyidejű /Program.cs mintát az Agent Framework-adattárban.

A Python-implementációban egy párhuzamos munkafolyamatot fog létrehozni, amely több párhuzamos végrehajtón keresztül dolgozza fel az adatokat, és különböző típusú eredményeket összesít. Ez a példa bemutatja, hogyan kezeli a keretrendszer a vegyes eredménytípusokat az egyidejű feldolgozásból.

Mit fog felépíteni?

Létre fog hozni egy munkafolyamatot, amely:

  • Számlistát vesz fel bemenetként
  • A listát két párhuzamos végrehajtóra osztja el (egy számítási átlag, egy számítási összeg)
  • A különböző eredménytípusokat (lebegőpontos és int) összesíti egy végső kimenetben
  • Bemutatja, hogyan kezeli a keretrendszer az egyidejű végrehajtók különböző eredménytípusait

A tárgyalt fogalmak

Előfeltételek

  • Python 3.10 vagy újabb
  • Az Agent Framework Core telepítve van: pip install agent-framework-core --pre

1. lépés: Kötelező függőségek importálása

Először importálja a szükséges összetevőket az Agent Frameworkből:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

2. lépés: A kézbesítő végrehajtójának létrehozása

A diszpécser feladata, hogy a kezdeti bemenetet több párhuzamos végrehajtónak ossza ki:

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

3. lépés: Párhuzamos feldolgozási végrehajtók létrehozása

Hozzon létre két végrehajtót, amelyek egyidejűleg fogják feldolgozni az adatokat:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

4. lépés: Az összesítő végrehajtó létrehozása

Az összesítő összegyűjti a párhuzamos végrehajtók eredményeit, és a végső kimenetet adja:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

5. lépés: A munkafolyamat létrehozása

A végrehajtók csatlakoztatása ki- és beszívó élminták használatával:

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

6. lépés: A munkafolyamat futtatása

Hajtsa végre a munkafolyamatot mintaadatokkal, és rögzítse a kimenetet:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

Hogyan működik?

  1. Fan-Out: A Dispatcher fogadja a bemeneti listát, és egyidejűleg elküldi a Average és Sum végrehajtóknak.
  2. Párhuzamos feldolgozás: Mindkét végrehajtó egyszerre dolgozza fel ugyanazt a bemenetet, és különböző eredménytípusokat hoz létre:
    • Averagea végrehajtó eredményt hoz létre float
    • Suma végrehajtó eredményt hoz létre int
  3. Fan-In: A Aggregator a két végrehajtótól érkező eredményeket egy mindkét típust tartalmazó listaként fogadja
  4. Típuskezelés: A keretrendszer automatikusan kezeli a különböző eredménytípusokat egyesítő típusok használatával (int | float)

Alapfogalmak

  • Fan-Out Edges: A add_fan_out_edges() segítségével ugyanaz a bemenet több végrehajtónak is elküldhető
  • Fan-In Edges: Több forrásvégrehajító eredményeinek gyűjtésére használható add_fan_in_edges()
  • Egyesítő típusok: Különböző eredménytípusok kezelése típusjegyzetekkel, például list[int | float]
  • Egyidejű végrehajtás: Több végrehajtó dolgoz fel adatokat egyszerre, javítva a teljesítményt

Implementáció befejezése

Az egyidejű munkafolyamat teljes körű implementációjához tekintse meg a aggregate_results_of_different_types.py mintát az Agent Framework-adattárban.

Következő lépések