Not
Åtkomst till denna sida kräver auktorisation. Du kan prova att logga in eller byta katalog.
Åtkomst till denna sida kräver auktorisation. Du kan prova att byta katalog.
Den här självstudien visar hur du skapar ett samtidigt arbetsflöde med Agent Framework. Du lär dig att implementera fan-out och fan-in-mönster som möjliggör parallell bearbetning, vilket gör att flera exekutorer eller agenter kan arbeta samtidigt och sedan sammanställa sina resultat.
Vad du kommer att bygga
Du skapar ett arbetsflöde som:
- Tar en fråga som indata (till exempel "Vad är temperatur?")
- Skickar samma fråga till två expert-AI-agenter samtidigt (fysiker och kemist)
- Samlar in och kombinerar svar från båda agenterna till en enda utdata
- Demonstrerar samtidig körning med AI-agenter med hjälp av fan-out och fan-in-mönster
Begrepp som omfattas
Förutsättningar
- .NET 8.0 SDK eller senare
- Azure OpenAI-tjänstslutpunkt och distribution konfigurerad
- Azure CLI installerat och autentiserat (för autentisering med Azure-autentiseringsuppgifter)
- Ett nytt konsolprogram
Steg 1: Installera NuGet-paket
Installera först de nödvändiga paketen för .NET-projektet:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Steg 2: Konfigurera beroenden och Azure OpenAI
Börja med att konfigurera projektet med de nödvändiga NuGet-paketen och Azure OpenAI-klienten:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Steg 3: Skapa expert-AI-agenter
Skapa två specialiserade AI-agenter som ger expertperspektiv:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Steg 4: Skapa startexekutor
Skapa en köre som initierar samtidig bearbetning genom att skicka indata till flera agenter:
var startExecutor = new ConcurrentStartExecutor();
Implementeringen ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Steg 5: Skapa Aggregation Executor
Skapa en exekutor som samlar in och kombinerar svar från flera agenter.
var aggregationExecutor = new ConcurrentAggregationExecutor();
Implementeringen ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Steg 6: Skapa arbetsflödet
Anslut körarna och agenterna med hjälp av fläkt och fläkt-in kantsammanfogningar.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Steg 7: Kör arbetsflödet
Kör arbetsflödet och samla in strömmande utdata:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Så här fungerar det
-
Fan-Out:
ConcurrentStartExecutortar emot frågan och fan-out gränsen skickar den till både fysikagenten och kemistagenten samtidigt. - Parallell bearbetning: Båda AI-agenterna bearbetar samma fråga samtidigt, var och en ger sitt expertperspektiv.
-
Fan-In:
ConcurrentAggregationExecutorsamlar inChatMessagesvar från båda agenterna. - Sammansättning: När båda svaren har tagits emot kombinerar aggregatorn dem till en formaterad utdata.
Viktiga begrepp
-
Fan-Out-kopplingar: Använd
AddFanOutEdge()för att distribuera samma indata till flera exekutörer eller agenter. -
Fan-In-kanter: Använd
AddFanInEdge()för att samla in resultat från flera källexekutorer. - AI-agentintegrering: AI-agenter kan användas direkt som köre i arbetsflöden.
-
Körbasklass: Anpassade utförare ärver från
Executor<TInput>och åsidosätterHandleAsyncmetoden. -
Aktivera token: Använd
TurnTokenför att signalera agenter för att börja bearbeta köade meddelanden. -
Körning av direktuppspelning: Använd
StreamAsync()för att hämta realtidsuppdateringar när arbetsflödet fortskrider.
Fullständig implementering
Fullständig implementering av det här samtidiga arbetsflödet med AI-agenter finns i exemplet Concurrent/Program.cs i Agent Framework-lagringsplatsen.
I Python-implementeringen skapar du ett samtidigt arbetsflöde som bearbetar data via flera parallella utförare och aggregerar resultat av olika typer. Det här exemplet visar hur ramverket hanterar blandade resultattyper från samtidig bearbetning.
Vad du kommer att bygga
Du skapar ett arbetsflöde som:
- Tar en lista med tal som indata
- Distribuerar listan till två parallella utförare (en beräknande medelvärde, en beräkningssumma)
- Aggregerar de olika resultattyperna (flyttal och int) till ett slutligt utdata
- Visar hur ramverket hanterar olika resultattyper från samtidiga exekutorer
Begrepp som omfattas
Förutsättningar
- Python 3.10 eller senare
- Agent Framework Core har installerats:
pip install agent-framework-core --pre
Steg 1: Importera nödvändiga beroenden
Börja med att importera nödvändiga komponenter från Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Steg 2: Skapa Dispatcher Exekveraren
Den dispatcher ansvarar för att distribuera de initiala indata till flera parallella utförare.
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Steg 3: Skapa exekutorer för parallell bearbetning
Skapa två köre som bearbetar data samtidigt:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Steg 4: Skapa Aggregator-exekutorn
Aggregatorn samlar in resultat från parallella utförare och ger de slutliga utdata:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Steg 5: Skapa arbetsflödet
Anslut exekverare med hjälp av fan-out och fan-in kantmönster.
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Steg 6: Kör arbetsflödet
Kör arbetsflödet med exempeldata och samla in utdata:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Så här fungerar det
-
Utdatagren: Den
Dispatchertar emot indatalistan och skickar den samtidigt till bådeAverage- ochSum-exekverare -
Parallell bearbetning: Båda körarna bearbetar samma indata samtidigt och producerar olika resultattyper:
-
Averageexecutor ger ettfloatresultat -
Sumexecutor genererar ettintresultat
-
-
Fläkt-In:
Aggregatortar emot resultat från båda verkställarna som en lista som innehåller båda typerna -
Typhantering: Ramverket hanterar automatiskt de olika resultattyperna med hjälp av unionstyper (
int | float)
Viktiga begrepp
-
Fan-Out Ändpunkter: Använd
add_fan_out_edges()för att skicka samma indata till flera exekutorer -
Fan-In Anslutningar: Använd
add_fan_in_edges()för att samla in resultat från flera källexekutorer -
Union-typer: Hantera olika resultattyper med hjälp av typanteckningar som
list[int | float] - Samtidig körning: Flera köre bearbetar data samtidigt, vilket förbättrar prestandan
Fullständig implementering
Den fullständiga fungerande implementeringen av det här samtidiga arbetsflödet finns i exempelprogrammet aggregate_results_of_different_types.py i Agent Framework-repositoriet.