Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tento kurz ukazuje, jak vytvořit souběžný pracovní postup pomocí rozhraní Agent Framework. Naučíte se implementovat vzory typu fan-out a fan-in, které umožňují paralelní zpracování, což umožňuje několika vykonavatelům nebo agentům pracovat současně a následně agregovat jejich výsledky.
Co budete vytvářet
Vytvoříte pracovní postup, který:
- Vezme otázku jako vstup (například "Co je teplota?").
- Odešle stejnou otázku dvěma odborníkům na umělou inteligenci současně (fyzik a chemik).
- Shromažďuje a kombinuje odpovědi z obou agentů do jednoho výstupu.
- Demonstruje současné spuštění s AI agenty pomocí rozvětvení/sbíhání vzorů.
Pokryté koncepty
Požadavky
- Sada .NET 8.0 SDK nebo novější
- Konfigurace koncového bodu a nasazení služby Azure OpenAI
- Nainstalované a ověřené rozhraní příkazového řádku Azure(pro ověřování přihlašovacích údajů Azure)
- Nová konzolová aplikace
Krok 1: Instalace balíčků NuGet
Nejprve nainstalujte požadované balíčky pro váš projekt .NET:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Krok 2: Nastavení závislostí a Azure OpenAI
Začněte nastavením projektu s požadovanými balíčky NuGet a klientem Azure OpenAI:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Krok 3: Vytvoření agentů expertní umělé inteligence
Vytvořte dva specializované agenty umělé inteligence, které budou poskytovat odborné perspektivy:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Krok 4: Vytvoření spouštěcího exekutoru
Vytvořte exekutor, který zahájí souběžné zpracování odesláním vstupu několika agentům:
var startExecutor = new ConcurrentStartExecutor();
Implementace ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Krok 5: Vytvoření exekutoru agregace
Vytvořte exekutor, který shromažďuje a kombinuje odpovědi z více agentů:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Implementace ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Krok 6: Sestavení pracovního postupu
Připojte exekutory a agenty pomocí hraničních vzorů fan-out a fan-in.
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Krok 7: Provedení pracovního postupu
Spusťte pracovní postup a zachyťte výstup streamování:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Jak to funguje
-
Fan-Out:
ConcurrentStartExecutorpřijme vstupní otázku a hrana fan-out ji odešle jak agentu fyzikovi, tak agentu chemikovi současně. - Paralelní zpracování: Oba agenti umělé inteligence zpracovávají stejnou otázku souběžně, přičemž každý z nich poskytuje svou odbornou perspektivu.
-
Fan-In: Shromažďuje
ConcurrentAggregationExecutorChatMessageodpovědi od obou agentů. - Agregace: Po přijetí obou odpovědí agregátor je zkombinuje do formátovaného výstupu.
Klíčové koncepty
-
Větvení: Pomocí
AddFanOutEdge()můžete distribuovat stejný vstup na více vykonavatelů nebo agentů. -
Fan-In hrany: Slouží
AddFanInEdge()ke shromažďování výsledků z více zdrojových exekutorů. - Integrace agenta AI: Agenty AI je možné použít přímo jako exekutory v pracovních postupech.
-
Základní třída Executor: Vlastní exekutory dědí ze
Executor<TInput>a přepíší metoduHandleAsync. -
Turn Tokens: Použijte
TurnTokenk signalizaci agentům k zahájení zpracování zpráv ve frontě. -
Spouštění streamování: Slouží
StreamAsync()k získání aktualizací v reálném čase při průběhu pracovního postupu.
Dokončení implementace
Kompletní funkční implementaci tohoto souběžného pracovního postupu s agenty AI najdete v ukázce Concurrent/Program.cs v úložišti Agent Framework.
V implementaci Pythonu vytvoříte souběžný pracovní postup, který zpracovává data prostřednictvím několika paralelních exekutorů a agreguje výsledky různých typů. Tento příklad ukazuje, jak architektura zpracovává smíšené typy výsledků ze souběžného zpracování.
Co budete vytvářet
Vytvoříte pracovní postup, který:
- Vezme jako vstup seznam čísel.
- Rozdělí seznam do dvou paralelních exekutorů (jeden výpočet průměru, jeden výpočet součtu).
- Agreguje různé typy výsledků (float a int) do konečného výstupu.
- Ukazuje, jak architektura zpracovává různé typy výsledků od souběžných exekutorů.
Pokryté koncepty
Požadavky
- Python 3.10 nebo novější
- Nainstalované rozhraní Agent Framework Core:
pip install agent-framework-core
Krok 1: Import požadovaných závislostí
Začněte importem potřebných komponent z agenta Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Krok 2: Vytvoření vykonavatele dispečera
Dispečer zodpovídá za distribuci počátečního vstupu do několika paralelních exekutorů:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Krok 3: Vytvoření exekutorů paralelního zpracování
Vytvořte dva exekutory, které budou zpracovávat data současně:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Krok 4: Vytvoření exekutoru agregátoru
Agregátor shromažďuje výsledky z paralelních exekutorů a poskytuje konečný výstup:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Krok 5: Sestavení pracovního postupu
Připojte vykonavatele pomocí vzorů rozdělovacího a sběrného spoje.
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Krok 6: Spuštění pracovního postupu
Spusťte pracovní postup s ukázkovými daty a zaznamenejte výstup:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Jak to funguje
-
Fan-Out:
Dispatcherpřijme vstupní seznam a současně ho odešle jak exekutorůmAverage, tak i exekutorůmSum. -
Paralelní zpracování: Oba exekutory zpracovávají stejný vstup souběžně a vytvářejí různé typy výsledků:
-
AverageExekutor vytvořífloatvýsledek. -
SumExekutor vytvoříintvýsledek.
-
-
Fan-In:
Aggregatorshromažďuje výsledky od obou vykonávačů do seznamu, který obsahuje oba typy. -
Zpracování typů: Architektura automaticky zpracovává různé typy výsledků pomocí sjednocovacího typu (
int | float)
Klíčové koncepty
-
Rozvětvené hrany: Slouží
add_fan_out_edges()k odeslání stejného vstupu do více vykonavatelů. -
Fan-In hrany: Slouží
add_fan_in_edges()ke shromažďování výsledků z více zdrojových exekutorů. -
Typy sjednocení: Zpracování různých typů výsledků pomocí poznámek typu, jako je
list[int | float] - Souběžné provádění: Více výkonných jednotek zpracovává data současně, což zlepšuje výkon.
Dokončení implementace
Kompletní funkční implementaci tohoto souběžného pracovního postupu najdete v ukázce aggregate_results_of_different_types.py v úložišti Agent Framework.