Catatan
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba masuk atau mengubah direktori.
Akses ke halaman ini memerlukan otorisasi. Anda dapat mencoba mengubah direktori.
Tutorial ini menunjukkan cara membuat alur kerja bersamaan menggunakan Agent Framework. Anda akan belajar menerapkan pola fan-out dan fan-in yang memungkinkan pemrosesan paralel, memungkinkan beberapa eksekutor atau agen untuk bekerja secara bersamaan dan kemudian menggabungkan hasilnya.
Apa yang akan Anda Bangun
Anda akan membuat alur kerja yang:
- Mengambil pertanyaan sebagai input (misalnya, "Apa itu suhu?")
- Mengirimkan pertanyaan yang sama kepada dua agen AI ahli secara bersamaan (Fisikawan dan Kimia)
- Mengumpulkan dan menggabungkan respons dari kedua agen ke dalam satu output
- Menunjukkan eksekusi bersamaan dengan agen AI menggunakan pola fan-out/fan-in
Konsep yang Tercakup
Prasyarat
- .NET 8.0 SDK atau yang lebih baru
- Titik akhir layanan Azure OpenAI dan penyebaran telah dikonfigurasi
- Azure CLI diinstal dan diautentikasi (untuk autentikasi kredensial Azure)
- Aplikasi konsol baru
Langkah 1: Instal paket NuGet
Pertama, instal paket yang diperlukan untuk proyek .NET Anda:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
Langkah 2: Menyiapkan Ketergantungan dan Azure OpenAI
Mulailah dengan menyiapkan proyek Anda dengan paket NuGet dan klien Azure OpenAI yang diperlukan:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
Langkah 3: Buat Agen AI Ahli
Buat dua agen AI khusus yang akan memberikan perspektif ahli:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
Langkah 4: Buat Eksekutor Mulai
Buat pelaksana yang memulai pemrosesan bersamaan dengan mengirim input ke beberapa agen:
var startExecutor = new ConcurrentStartExecutor();
Implementasinya ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
Langkah 5: Buat Eksekutor Agregasi
Buat pelaksana yang mengumpulkan dan menggabungkan respons dari beberapa agen:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Implementasinya ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
Langkah 6: Bangun Alur Kerja
Sambungkan pelaksana dan agen dengan menggunakan pola edge fan-out dan fan-in:
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
Langkah 7: Jalankan Alur Kerja
Jalankan alur kerja dan ambil output streaming:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Cara Kerjanya
-
Fan-Out:
ConcurrentStartExecutormenerima pertanyaan masukan dan tepi fan-out mengirimkannya secara simultan ke agen Ahli Fisika dan Ahli Kimia. - Pemrosesan Paralel: Kedua agen AI memproses pertanyaan yang sama secara bersamaan, masing-masing memberikan perspektif ahli mereka.
-
Fan-In: Mengumpulkan
ConcurrentAggregationExecutorChatMessagerespons dari kedua agen. - Agregasi: Setelah kedua respons diterima, agregator menggabungkannya ke dalam output yang diformat.
Konsep Utama
-
Fan-Out Edges: Gunakan
AddFanOutEdge()untuk mendistribusikan input yang sama ke beberapa pelaksana atau agen. -
Fan-In Edges: Gunakan
AddFanInEdge()untuk mengumpulkan hasil dari beberapa eksekutor sumber. - Integrasi Agen AI: Agen AI dapat digunakan langsung sebagai pelaksana dalam alur kerja.
-
Kelas Dasar Eksekutor: Eksekutor kustom mewarisi dari
Executor<TInput>dan menimpa metodeHandleAsync. -
Giliran Token: Gunakan
TurnTokenuntuk memberi sinyal kepada agen agar mulai memproses pesan yang diantrekan. -
Eksekusi Streaming: Gunakan
StreamAsync()untuk mendapatkan pembaruan real time saat alur kerja berlangsung.
Implementasi Lengkap
Untuk implementasi kerja lengkap alur kerja bersamaan ini dengan agen AI, lihat sampel Bersamaan/Program.cs di repositori Kerangka Kerja Agen.
Dalam implementasi Python, Anda akan membangun alur kerja bersamaan yang memproses data melalui beberapa eksekutor paralel dan menggabungkan hasil dari berbagai jenis. Contoh ini menunjukkan bagaimana kerangka kerja menangani jenis hasil campuran dari pemrosesan bersamaan.
Apa yang akan Anda Bangun
Anda akan membuat alur kerja yang:
- Mengambil daftar angka sebagai input
- Mendistribusikan daftar ke dua eksekutor yang bekerja secara paralel (satu menghitung rata-rata, satu menghitung jumlah)
- Mengagregasi berbagai jenis hasil (float dan int) ke dalam output akhir
- Menunjukkan bagaimana kerangka kerja menangani berbagai jenis hasil dari pelaksana bersamaan
Konsep yang Tercakup
Prasyarat
- Python 3.10 atau yang lebih baru
- Agent Framework Core terinstal:
pip install agent-framework-core --pre
Langkah 1: Impor Dependensi yang Diperlukan
Mulailah dengan mengimpor komponen yang diperlukan dari Agent Framework:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
Langkah 2: Buat Eksekutor Dispatcher
Dispatcher bertanggung jawab untuk mendistribusikan input awal ke beberapa pelaksana paralel:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
Langkah 3: Buat Pelaksana Pemrosesan Paralel
Buat dua pelaksana yang akan memproses data secara bersamaan:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
Langkah 4: Buat Eksekutor Agregator
Agregator mengumpulkan hasil dari eksekutor paralel dan menghasilkan output akhir:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
Langkah 5: Bangun Alur Kerja
Sambungkan eksekutor menggunakan pola pinggir fan-out dan fan-in:
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
Langkah 6: Jalankan Alur Kerja
Jalankan alur kerja dengan data sampel dan ambil output:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Cara Kerjanya
-
Fan-Out:
Dispatchermenerima daftar masukan dan mengirimkannya ke eksekutorAveragedanSumsecara bersamaan. -
Pemrosesan Paralel: Kedua eksekutor memproses input yang sama secara bersamaan, menghasilkan jenis hasil yang berbeda:
-
Averageeksekutorfloatmenghasilkan output -
Sumpelaksana menghasilkan hasilint
-
-
Fan-In:
AggregatorMenerima hasil dari kedua eksekutor sebagai daftar yang berisi kedua jenis -
Penanganan Jenis: Kerangka kerja secara otomatis menangani berbagai jenis hasil menggunakan jenis union (
int | float)
Konsep Utama
-
Fan-Out Edges: Gunakan
add_fan_out_edges()untuk mengirim input yang sama ke beberapa eksekutor -
Fan-In Edges: Gunakan
add_fan_in_edges()untuk mengumpulkan hasil dari berbagai eksekutor sumber -
Tipe Union: Menangani jenis hasil yang berbeda menggunakan anotasi tipe seperti
list[int | float] - Eksekusi Bersamaan: Beberapa eksekutor memproses data secara bersamaan, meningkatkan performa
Implementasi Lengkap
Untuk implementasi kerja lengkap alur kerja bersamaan ini, lihat sampel aggregate_results_of_different_types.py di repositori Kerangka Kerja Agen.