Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Deze zelfstudie laat zien hoe u een gelijktijdige werkstroom maakt met behulp van Agent Framework. U leert hoe u fan-out- en fan-inpatronen implementeert die parallelle verwerking mogelijk maken, zodat meerdere uitvoerders of agents tegelijkertijd kunnen werken en vervolgens hun resultaten aggregeren.
Wat je gaat bouwen
U maakt een werkstroom die:
- Neemt een vraag als invoer (bijvoorbeeld 'Wat is temperatuur?')
- Stuurt dezelfde vraag naar twee deskundige AI-agents tegelijk (fysicus en chemicus)
- Verzamelt en combineert antwoorden van beide agents in één uitvoer
- Demonstreert gelijktijdige uitvoering met AI-agenten via fan-out/fan-in-patronen
Behandelde concepten
Vereiste voorwaarden
- .NET 8.0 SDK of hoger
- Azure OpenAI-service-eindpunt en -implementatie geconfigureerd
- Azure CLI geïnstalleerd en geverifieerd (voor Azure-referentieverificatie)
- Een nieuwe consoletoepassing
Stap 1: NuGet-pakketten installeren
Installeer eerst de vereiste pakketten voor uw .NET-project:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Stap 2: Afhankelijkheden en Azure OpenAI instellen
Begin met het instellen van uw project met de vereiste NuGet-pakketten en De Azure OpenAI-client:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Stap 3: Expert AI-agents maken
Maak twee gespecialiseerde AI-agents die deskundige perspectieven bieden:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Stap 4: De startuitvoering maken
Maak een uitvoerprogramma waarmee de gelijktijdige verwerking wordt gestart door invoer naar meerdere agents te verzenden:
var startExecutor = new ConcurrentStartExecutor();
De ConcurrentStartExecutor implementatie:
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Stap 5: de aggregatieuitvoerer maken
Maak een uitvoerder waarmee antwoorden van meerdere agents worden verzameld en gecombineerd:
var aggregationExecutor = new ConcurrentAggregationExecutor();
De ConcurrentAggregationExecutor implementatie:
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Stap 6: De werkstroom bouwen
Verbind de uitvoerders en agents met behulp van fan-out- en fan-in edge-patronen:
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Stap 7: De werkstroom uitvoeren
Voer de werkstroom uit en leg de streaming-uitvoer vast:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Hoe het werkt
-
Fan-Out: De
ConcurrentStartExecutorontvangt de invoervraag en de fan-out-kant stuurt deze gelijktijdig naar zowel de fysicus als de chemicus. - Parallelle verwerking: beide AI-agents verwerken dezelfde vraag gelijktijdig, elk met hun deskundige perspectief.
-
Fan-In: De
ConcurrentAggregationExecutorverzameltChatMessagereacties van beide agents. - Aggregatie: Zodra beide antwoorden zijn ontvangen, combineert de aggregator deze in een opgemaakte uitvoer.
Sleutelbegrippen
-
Fan-Out Edges: gebruik
AddFanOutEdge()om dezelfde invoer te distribueren naar meerdere executors of agents. -
Fan-In Edges: gebruiken
AddFanInEdge()om resultaten van meerdere bronexecutors te verzamelen. - INTEGRATIE van AI-agent: AI-agents kunnen rechtstreeks worden gebruikt als uitvoerders in werkstromen.
-
Executor-basisclass: Aangepaste uitvoerders erven van
Executor<TInput>en overschrijven deHandleAsync-methode. -
Turn Tokens: Gebruik
TurnTokenom agenten te signaleren om berichten in de wachtrij te verwerken. -
Streaming-uitvoering: gebruik
StreamAsync()om realtime updates op te halen terwijl de werkstroom vordert.
Implementatie voltooien
Zie het voorbeeld Concurrent/Program.cs in de Agent Framework-opslagplaats voor de volledige werkende implementatie van deze gelijktijdige workflow met AI-agents.
In de Python-implementatie bouwt u een gelijktijdige werkstroom waarmee gegevens worden verwerkt via meerdere parallelle uitvoerders en resultaten van verschillende typen worden geaggregeerd. In dit voorbeeld ziet u hoe het framework gemengde resultaattypen verwerkt van gelijktijdige verwerking.
Wat je gaat bouwen
U maakt een werkstroom die:
- Neemt een lijst met getallen als invoer
- Distribueert de lijst naar twee parallelle processen (één berekent gemiddelde, één berekent som)
- Voegt de verschillende resultaattypen (float en int) samen in een uiteindelijke uitvoer
- Demonstreert hoe het framework verschillende resultaattypen verwerkt van gelijktijdige uitvoerders
Behandelde concepten
Vereiste voorwaarden
- Python 3.10 of hoger
- Agent Framework Core geïnstalleerd:
pip install agent-framework-core --pre
Stap 1: Vereiste afhankelijkheden importeren
Begin met het importeren van de benodigde onderdelen uit Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Stap 2: De Dispatcher Executor maken
De dispatcher is verantwoordelijk voor het distribueren van de initiële invoer naar meerdere parallelle uitvoerders:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Stap 3: Parallelle verwerkingsexecutors maken
Maak twee uitvoerders waarmee de gegevens gelijktijdig worden verwerkt:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Stap 4: De Aggregator Executor maken
De aggregator verzamelt resultaten van de parallelle uitvoerders en levert de uiteindelijke uitvoer op:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Stap 5: De werkstroom bouwen
Verbind de uitvoerders met behulp van Fan-out- en Fan-in-randpatronen:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Stap 6: De werkstroom uitvoeren
Voer de werkstroom uit met voorbeeldgegevens en leg de uitvoer vast:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Hoe het werkt
-
Fan-Out: De
Dispatcherontvangt de invoerlijst en verzendt deze tegelijkertijd naar zowel deAverageals deSumuitvoerders. -
Parallelle verwerking: beide uitvoerders verwerken dezelfde invoer gelijktijdig, waardoor verschillende resultaattypen worden geproduceerd:
-
Averageuitvoerder produceert eenfloatresultaat -
Sumuitvoerder produceert eenintresultaat
-
-
Fan-In: De
Aggregatorontvangt resultaten van beide uitvoerders als een lijst met beide typen -
Typeafhandeling: Het framework verwerkt automatisch de verschillende resultaattypen met behulp van samenvoegtypen (
int | float)
Sleutelbegrippen
-
Fan-Out Edges: Gebruik
add_fan_out_edges()om dezelfde invoer naar meerdere uitvoerders te verzenden -
Fan-In Edges: gebruiken
add_fan_in_edges()om resultaten van meerdere bronexecutors te verzamelen -
Samenvoegtypen: verschillende resultaattypen verwerken met behulp van typeaantekeningen zoals
list[int | float] - Gelijktijdige uitvoering: meerdere uitvoerders verwerken gegevens tegelijkertijd, waardoor de prestaties worden verbeterd
Implementatie voltooien
Zie het aggregate_results_of_different_types.py voorbeeld in de agentframeworkopslagplaats voor de volledige werkende implementatie van deze gelijktijdige werkstroom.