Jegyzet
Az oldalhoz való hozzáférés engedélyezést igényel. Próbálhatod be jelentkezni vagy könyvtárat váltani.
Az oldalhoz való hozzáférés engedélyezést igényel. Megpróbálhatod a könyvtár váltását.
Ez az oktatóanyag bemutatja, hogyan hozhat létre egyidejű munkafolyamatot az Agent Framework használatával. Megtanulhatja a ki- és bekapcsolási mintákat, amelyek lehetővé teszik a párhuzamos feldolgozást, lehetővé téve több végrehajtó vagy ügynök egyidejű működését, majd az eredmények összesítését.
Mit fog felépíteni?
Létre fog hozni egy munkafolyamatot, amely:
- Bemenetként vesz fel egy kérdést (például "Mi a hőmérséklet?")
- Ugyanazt a kérdést küldi két szakértő AI-ügynöknek egyszerre (fizikus és vegyész)
- Összegyűjti és egyesíti a két ügynök válaszait egyetlen kimenetben
- Bemutatja az AI-ügynökök párhuzamos végrehajtását fan-out/fan-in minták használatával
A tárgyalt fogalmak
Előfeltételek
- .NET 8.0 SDK vagy újabb
- Azure OpenAI szolgáltatásvégpont és üzembe helyezés konfigurálva
- Az Azure CLI telepítve és hitelesítve (az Azure hitelesítő adatokkal való hitelesítéshez)
- Új konzolalkalmazás
1. lépés: NuGet-csomagok telepítése
Először telepítse a szükséges csomagokat a .NET-projekthez:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
2. lépés: A függőségek és az Azure OpenAI beállítása
Először állítsa be a projektet a szükséges NuGet-csomagokkal és az Azure OpenAI-ügyféllel:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
3. lépés: Szakértői AI-ügynökök létrehozása
Hozzon létre két speciális AI-ügynököt, amelyek szakértői perspektívákat biztosítanak:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
4. lépés: A végrehajtó indítása
Hozzon létre egy végrehajtót, amely több ügynöknek küldött bemenettel kezdeményezi az egyidejű feldolgozást:
var startExecutor = new ConcurrentStartExecutor();
A ConcurrentStartExecutor megvalósítás:
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
5. lépés: Az összesítési végrehajtó létrehozása
Hozzon létre egy végrehajtót, amely több ügynök válaszait gyűjti össze és egyesíti:
var aggregationExecutor = new ConcurrentAggregationExecutor();
A ConcurrentAggregationExecutor megvalósítás:
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
6. lépés: A munkafolyamat létrehozása
Csatlakoztassa a végrehajtókat és az ügynököket a fan-out és fan-in peremmintákkal.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
7. lépés: A munkafolyamat végrehajtása
Futtassa a munkafolyamatot, és rögzítse a stream kimenetét:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Hogyan működik?
-
Fan-Out: A
ConcurrentStartExecutorfogadja a bemeneti kérdést, és a fan-out él egyszerre küldi el a fizikus és a kémikus ügynököknek. - Párhuzamos feldolgozás: Mindkét AI-ügynök egyszerre dolgozza fel ugyanazt a kérdést, és mindegyik a saját szakértői perspektíváját biztosítja.
-
Fan-In: A
ConcurrentAggregationExecutorkét ügynök válaszait gyűjtiChatMessageössze. - Összesítés: Miután mindkét válasz megérkezett, az összesítő formázott kimenetbe egyesíti őket.
Alapfogalmak
-
Fan-Out Edges: Ugyanazt
AddFanOutEdge()a bemenetet több végrehajtó vagy ügynök számára is elosztja. -
Fan-In Edges: Több forrásvégrehajító eredményeinek gyűjtésére használható
AddFanInEdge(). - AI-ügynökintegráció: Az AI-ügynökök közvetlenül végrehajtóként használhatók a munkafolyamatokban.
-
A végrehajtó alap osztálya: Az egyéni végrehajtók a
Executor<TInput>osztályból öröklődnek és felülbírálják aHandleAsyncmetódust. -
Tokenek aktiválása: Használja a
TurnToken-t, hogy jelezze az ügynököknek a sorban álló üzenetek feldolgozásának megkezdését. -
Streamelés végrehajtása: Valós idejű frissítések lekérésére használható
StreamAsync()a munkafolyamat előrehaladása során.
Implementáció befejezése
Ennek az egyidejű munkafolyamatnak az AI-ügynökökkel való teljes körű működéséhez tekintse meg az Egyidejű /Program.cs mintát az Agent Framework-adattárban.
A Python-implementációban egy párhuzamos munkafolyamatot fog létrehozni, amely több párhuzamos végrehajtón keresztül dolgozza fel az adatokat, és különböző típusú eredményeket összesít. Ez a példa bemutatja, hogyan kezeli a keretrendszer a vegyes eredménytípusokat az egyidejű feldolgozásból.
Mit fog felépíteni?
Létre fog hozni egy munkafolyamatot, amely:
- Számlistát vesz fel bemenetként
- A listát két párhuzamos végrehajtóra osztja el (egy számítási átlag, egy számítási összeg)
- A különböző eredménytípusokat (lebegőpontos és int) összesíti egy végső kimenetben
- Bemutatja, hogyan kezeli a keretrendszer az egyidejű végrehajtók különböző eredménytípusait
A tárgyalt fogalmak
Előfeltételek
- Python 3.10 vagy újabb
- Az Agent Framework Core telepítve van:
pip install agent-framework-core --pre
1. lépés: Kötelező függőségek importálása
Először importálja a szükséges összetevőket az Agent Frameworkből:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
2. lépés: A kézbesítő végrehajtójának létrehozása
A diszpécser feladata, hogy a kezdeti bemenetet több párhuzamos végrehajtónak ossza ki:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
3. lépés: Párhuzamos feldolgozási végrehajtók létrehozása
Hozzon létre két végrehajtót, amelyek egyidejűleg fogják feldolgozni az adatokat:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
4. lépés: Az összesítő végrehajtó létrehozása
Az összesítő összegyűjti a párhuzamos végrehajtók eredményeit, és a végső kimenetet adja:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
5. lépés: A munkafolyamat létrehozása
A végrehajtók csatlakoztatása ki- és beszívó élminták használatával:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
6. lépés: A munkafolyamat futtatása
Hajtsa végre a munkafolyamatot mintaadatokkal, és rögzítse a kimenetet:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Hogyan működik?
-
Fan-Out: A
Dispatcherfogadja a bemeneti listát, és egyidejűleg elküldi aAverageésSumvégrehajtóknak. -
Párhuzamos feldolgozás: Mindkét végrehajtó egyszerre dolgozza fel ugyanazt a bemenetet, és különböző eredménytípusokat hoz létre:
-
Averagea végrehajtó eredményt hoz létrefloat -
Suma végrehajtó eredményt hoz létreint
-
-
Fan-In: A
Aggregatora két végrehajtótól érkező eredményeket egy mindkét típust tartalmazó listaként fogadja -
Típuskezelés: A keretrendszer automatikusan kezeli a különböző eredménytípusokat egyesítő típusok használatával (
int | float)
Alapfogalmak
-
Fan-Out Edges: A
add_fan_out_edges()segítségével ugyanaz a bemenet több végrehajtónak is elküldhető -
Fan-In Edges: Több forrásvégrehajító eredményeinek gyűjtésére használható
add_fan_in_edges() -
Egyesítő típusok: Különböző eredménytípusok kezelése típusjegyzetekkel, például
list[int | float] - Egyidejű végrehajtás: Több végrehajtó dolgoz fel adatokat egyszerre, javítva a teljesítményt
Implementáció befejezése
Az egyidejű munkafolyamat teljes körű implementációjához tekintse meg a aggregate_results_of_different_types.py mintát az Agent Framework-adattárban.