Not
Bu sayfaya erişim yetkilendirme gerektiriyor. Oturum açmayı veya dizinleri değiştirmeyi deneyebilirsiniz.
Bu sayfaya erişim yetkilendirme gerektiriyor. Dizinleri değiştirmeyi deneyebilirsiniz.
Bu öğreticide, Agent Framework kullanarak eşzamanlı iş akışının nasıl oluşturulacağı gösterilmektedir. Paralel işlemeyi sağlayan, birden çok yürütücü veya aracının aynı anda çalışmasına ve ardından sonuçlarını toplamasına olanak tanıyan fan-out ve fan-in desenleri uygulamayı öğreneceksiniz.
Neler Yapacaksınız
Şu şekilde bir iş akışı oluşturacaksınız:
- Giriş olarak bir soru alır (örneğin, "Sıcaklık nedir?")
- Aynı soruyu aynı anda iki uzman yapay zeka aracısına gönderir (Fizikçi ve Kimyacı)
- Her iki aracıdan gelen yanıtları toplar ve tek bir çıkışta birleştirir
- Fan-out/fan-in desenlerini kullanarak yapay zeka aracılarıyla eşzamanlı yürütmeyi gösterir
Ele Alınan Kavramlar
Önkoşullar
- .NET 8.0 SDK veya üzeri
- Azure OpenAI hizmet uç noktası ve dağıtımı yapılandırıldı
- Azure CLI yüklü ve oturum açılmış (Azure kimlik bilgisi kimlik doğrulaması için)
- Yeni bir konsol uygulaması
1. Adım: NuGet paketlerini yükleme
İlk olarak, .NET projeniz için gerekli paketleri yükleyin:
dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease
2. Adım: Bağımlılıkları ve Azure OpenAI'i Ayarlama
Projenizi gerekli NuGet paketleri ve Azure OpenAI istemcisiyle ayarlayarak başlayın:
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;
public static class Program
{
private static async Task Main()
{
// Set up the Azure OpenAI client
var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
.GetChatClient(deploymentName).AsIChatClient();
3. Adım: Uzman yapay zeka aracıları oluşturma
Uzman perspektifleri sağlayacak iki özel yapay zeka aracısı oluşturun:
// Create the AI agents with specialized expertise
ChatClientAgent physicist = new(
chatClient,
name: "Physicist",
instructions: "You are an expert in physics. You answer questions from a physics perspective."
);
ChatClientAgent chemist = new(
chatClient,
name: "Chemist",
instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
);
4. Adım: Başlat Yürütücüsü Oluşturma
Birden çok aracıya giriş göndererek eşzamanlı işlemeyi başlatan bir yürütücü oluşturun:
var startExecutor = new ConcurrentStartExecutor();
Uygulama yöntemi ConcurrentStartExecutor :
/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
/// <summary>
/// Starts the concurrent processing by sending messages to the agents.
/// </summary>
/// <param name="message">The user message to process</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Broadcast the message to all connected agents. Receiving agents will queue
// the message but will not start processing until they receive a turn token.
await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);
// Broadcast the turn token to kick off the agents.
await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
}
}
5. Adım: Toplama Yürütücüsü Oluşturma
Birden çok aracıdan gelen yanıtları toplayan ve birleştiren bir yürütücü oluşturun:
var aggregationExecutor = new ConcurrentAggregationExecutor();
Uygulama yöntemi ConcurrentAggregationExecutor :
/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
private readonly List<ChatMessage> _messages = [];
/// <summary>
/// Handles incoming messages from the agents and aggregates their responses.
/// </summary>
/// <param name="message">The message from the agent</param>
/// <param name="context">Workflow context for accessing workflow services and adding events</param>
/// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
/// The default is <see cref="CancellationToken.None"/>.</param>
/// <returns>A task representing the asynchronous operation</returns>
public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._messages.AddRange(message);
if (this._messages.Count == 2)
{
var formattedMessages = string.Join(Environment.NewLine,
this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
await context.YieldOutputAsync(formattedMessages, cancellationToken);
}
}
}
6. Adım: İş Akışını Oluşturma
Yürütücüleri ve aracıları, uç ve fanlı kenar desenlerini kullanarak bağlayın:
// Build the workflow by adding executors and connecting them
var workflow = new WorkflowBuilder(startExecutor)
.AddFanOutEdge(startExecutor, targets: [physicist, chemist])
.AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
.WithOutputFrom(aggregationExecutor)
.Build();
7. Adım: İş Akışını Yürütme
İş akışını çalıştırın ve akış çıkışını yakalayın:
// Execute the workflow in streaming mode
await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is WorkflowOutputEvent output)
{
Console.WriteLine($"Workflow completed with results:\n{output.Data}");
}
}
}
}
Nasıl Çalışır?
-
Fan-Out:
ConcurrentStartExecutorgiriş sorusunu alır ve fan-out kenarı, soruyu aynı anda hem Fizikçi hem de Kimyager ajanlara gönderir. - Paralel İşleme: Her iki yapay zeka aracısı da aynı soruyu eşzamanlı olarak işler ve her biri kendi uzman perspektifini sağlar.
-
Fan-In: her
ConcurrentAggregationExecutoriki aracıdan da yanıt toplarChatMessage. - Toplama: Her iki yanıt da alındıktan sonra toplayıcı bunları biçimlendirilmiş bir çıkışta birleştirir.
Önemli Kavramlar
-
Fan-Out Kenarları: Aynı girişi birden çok yürütücüye veya aracıya dağıtmak için
AddFanOutEdge()kullanın. -
Fan-In Edges: Birden çok kaynak uygulayıcıdan sonuç toplamak için
AddFanInEdge()kullanın. - Yapay Zeka Aracısı Tümleştirmesi: Yapay zeka aracıları doğrudan iş akışlarında yürütücü olarak kullanılabilir.
-
Yürütücü Temel Sınıfı: Özel yürütücüler
Executor<TInput>sınıfını devralır veHandleAsyncyöntemini geçersiz kılar. -
Geçiş Belirteçlerini Kullanın: Aracılara kuyruğa alınan iletileri işlemeye başlamaları için sinyal vermek üzere
TurnTokenkullanın. -
Akış Yürütme: İş akışı ilerledikçe gerçek zamanlı güncelleştirmeleri almak için kullanın
StreamAsync().
Uygulamayı Tamamla
Yapay zeka aracılarıyla bu eşzamanlı iş akışının tam olarak çalışması için Agent Framework deposundaki Concurrent/Program.cs örneğine bakın.
Python uygulamasında, birden çok paralel yürütücü aracılığıyla verileri işleyen ve farklı türlerdeki sonuçları toplayan eşzamanlı bir iş akışı oluşturacaksınız. Bu örnekte, çerçevenin eşzamanlı işlemeden karma sonuç türlerini nasıl işlediği gösterilmektedir.
Neler Yapacaksınız
Şu şekilde bir iş akışı oluşturacaksınız:
- Sayı listesini giriş olarak alır
- Listeyi iki paralel yürütücüye dağıtır (bir hesaplama ortalaması, bir hesaplama toplamı)
- Farklı sonuç türlerini (float ve int) son çıktıda toplar
- Çerçevenin eşzamanlı yürütücülerden farklı sonuç türlerini nasıl işlediğini gösterir
Ele Alınan Kavramlar
Önkoşullar
- Python 3.10 veya üzeri
- Agent Framework Core yüklendi:
pip install agent-framework-core --pre
1. Adım: Gerekli Bağımlılıkları İçeri Aktarma
Agent Framework'ten gerekli bileşenleri içeri aktararak başlayın:
import asyncio
import random
from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never
2. Adım: Dağıtıcı Yürütücüsü Oluşturma
Dağıtıcı, ilk girişi birden çok paralel yürütücüye dağıtmaktan sorumludur:
class Dispatcher(Executor):
"""
The sole purpose of this executor is to dispatch the input of the workflow to
other executors.
"""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
if not numbers:
raise RuntimeError("Input must be a valid list of integers.")
await ctx.send_message(numbers)
3. Adım: Paralel İşleme Yürütücüleri Oluşturma
Verileri eşzamanlı olarak işleyecek iki yürütücü oluşturun:
class Average(Executor):
"""Calculate the average of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
average: float = sum(numbers) / len(numbers)
await ctx.send_message(average)
class Sum(Executor):
"""Calculate the sum of a list of integers."""
@handler
async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
total: int = sum(numbers)
await ctx.send_message(total)
4. Adım: Toplayıcı Yürütücüsü Oluşturma
Toplayıcı, paralel yürütücülerden sonuçları toplar ve son çıkışı verir:
class Aggregator(Executor):
"""Aggregate the results from the different tasks and yield the final output."""
@handler
async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
"""Receive the results from the source executors.
The framework will automatically collect messages from the source executors
and deliver them as a list.
Args:
results (list[int | float]): execution results from upstream executors.
The type annotation must be a list of union types that the upstream
executors will produce.
ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
"""
await ctx.yield_output(results)
5. Adım: İş Akışını Oluşturma
Yürütücüleri fan-out ve fan-in kenar desenlerini kullanarak bağlayın.
async def main() -> None:
# 1) Create the executors
dispatcher = Dispatcher(id="dispatcher")
average = Average(id="average")
summation = Sum(id="summation")
aggregator = Aggregator(id="aggregator")
# 2) Build a simple fan out and fan in workflow
workflow = (
WorkflowBuilder()
.set_start_executor(dispatcher)
.add_fan_out_edges(dispatcher, [average, summation])
.add_fan_in_edges([average, summation], aggregator)
.build()
)
6. Adım: İş Akışını Çalıştırma
Örnek verilerle iş akışını yürütün ve çıktıyı kaydedin:
# 3) Run the workflow
output: list[int | float] | None = None
async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
if isinstance(event, WorkflowOutputEvent):
output = event.data
if output is not None:
print(output)
if __name__ == "__main__":
asyncio.run(main())
Nasıl Çalışır?
-
Fan-Out: giriş
Dispatcherlistesini alır ve hemAveragehem deSumyürütücülerine aynı anda gönderir -
Paralel İşleme: Her iki yürütücü de aynı girişi eşzamanlı olarak işler ve farklı sonuç türleri oluşturur:
-
Averageyürütücü birfloatsonuç üretir -
Sumyürütücü birintsonuç üretir
-
-
Fan-In: her
Aggregatoriki yürütücüden de sonuçları her iki türü içeren bir liste olarak alır -
Tür İşleme: Çerçeve birleşim türlerini kullanarak farklı sonuç türlerini otomatik olarak işler (
int | float)
Önemli Kavramlar
-
Fan-Out Edge'ler: Aynı girişi birden çok yürütücüye göndermek için kullanın
add_fan_out_edges() -
Fan-In Edge'ler: Birden çok kaynak yürütücüden sonuç toplamak için kullanın
add_fan_in_edges() -
Birleşim Türleri: Gibi tür ek açıklamalarını kullanarak farklı sonuç türlerini işleme
list[int | float] - Eşzamanlı Yürütme: Birden çok yürütücü verileri aynı anda işleyerek performansı artırır
Uygulamayı Tamamla
Bu eşzamanlı iş akışının tam çalışma uygulaması için Agent Framework deposundaki aggregate_results_of_different_types.py örneğine bakın.