Bagikan melalui


Membuat Alur Kerja Bersamaan Sederhana

Tutorial ini menunjukkan cara membuat alur kerja bersamaan menggunakan Agent Framework. Anda akan belajar menerapkan pola fan-out dan fan-in yang memungkinkan pemrosesan paralel, memungkinkan beberapa eksekutor atau agen untuk bekerja secara bersamaan dan kemudian menggabungkan hasilnya.

Apa yang akan Anda Bangun

Anda akan membuat alur kerja yang:

  • Mengambil pertanyaan sebagai input (misalnya, "Apa itu suhu?")
  • Mengirimkan pertanyaan yang sama kepada dua agen AI ahli secara bersamaan (Fisikawan dan Kimia)
  • Mengumpulkan dan menggabungkan respons dari kedua agen ke dalam satu output
  • Menunjukkan eksekusi bersamaan dengan agen AI menggunakan pola fan-out/fan-in

Konsep yang Tercakup

Prasyarat

Langkah 1: Instal paket NuGet

Pertama, instal paket yang diperlukan untuk proyek .NET Anda:

dotnet add package Azure.AI.OpenAI --prerelease
dotnet add package Azure.Identity
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
dotnet add package Microsoft.Extensions.AI.OpenAI --prerelease

Langkah 2: Menyiapkan Ketergantungan dan Azure OpenAI

Mulailah dengan menyiapkan proyek Anda dengan paket NuGet dan klien Azure OpenAI yang diperlukan:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;
using Azure.AI.OpenAI;
using Azure.Identity;
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
using Microsoft.Extensions.AI;

public static class Program
{
    private static async Task Main()
    {
        // Set up the Azure OpenAI client
        var endpoint = Environment.GetEnvironmentVariable("AZURE_OPENAI_ENDPOINT") ?? throw new Exception("AZURE_OPENAI_ENDPOINT is not set.");
        var deploymentName = Environment.GetEnvironmentVariable("AZURE_OPENAI_DEPLOYMENT_NAME") ?? "gpt-4o-mini";
        var chatClient = new AzureOpenAIClient(new Uri(endpoint), new AzureCliCredential())
            .GetChatClient(deploymentName).AsIChatClient();

Langkah 3: Buat Agen AI Ahli

Buat dua agen AI khusus yang akan memberikan perspektif ahli:

        // Create the AI agents with specialized expertise
        ChatClientAgent physicist = new(
            chatClient,
            name: "Physicist",
            instructions: "You are an expert in physics. You answer questions from a physics perspective."
        );

        ChatClientAgent chemist = new(
            chatClient,
            name: "Chemist",
            instructions: "You are an expert in chemistry. You answer questions from a chemistry perspective."
        );

Langkah 4: Buat Eksekutor Mulai

Buat pelaksana yang memulai pemrosesan bersamaan dengan mengirim input ke beberapa agen:

        var startExecutor = new ConcurrentStartExecutor();

Implementasinya ConcurrentStartExecutor :

/// <summary>
/// Executor that starts the concurrent processing by sending messages to the agents.
/// </summary>
internal sealed class ConcurrentStartExecutor() : Executor<string>("ConcurrentStartExecutor")
{
    /// <summary>
    /// Starts the concurrent processing by sending messages to the agents.
    /// </summary>
    /// <param name="message">The user message to process</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(string message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Broadcast the message to all connected agents. Receiving agents will queue
        // the message but will not start processing until they receive a turn token.
        await context.SendMessageAsync(new ChatMessage(ChatRole.User, message), cancellationToken);

        // Broadcast the turn token to kick off the agents.
        await context.SendMessageAsync(new TurnToken(emitEvents: true), cancellationToken);
    }
}

Langkah 5: Buat Eksekutor Agregasi

Buat pelaksana yang mengumpulkan dan menggabungkan respons dari beberapa agen:

        var aggregationExecutor = new ConcurrentAggregationExecutor();

Implementasinya ConcurrentAggregationExecutor :

/// <summary>
/// Executor that aggregates the results from the concurrent agents.
/// </summary>
internal sealed class ConcurrentAggregationExecutor() :
    Executor<List<ChatMessage>>("ConcurrentAggregationExecutor")
{
    private readonly List<ChatMessage> _messages = [];

    /// <summary>
    /// Handles incoming messages from the agents and aggregates their responses.
    /// </summary>
    /// <param name="message">The message from the agent</param>
    /// <param name="context">Workflow context for accessing workflow services and adding events</param>
    /// <param name="cancellationToken">The <see cref="CancellationToken"/> to monitor for cancellation requests.
    /// The default is <see cref="CancellationToken.None"/>.</param>
    /// <returns>A task representing the asynchronous operation</returns>
    public override async ValueTask HandleAsync(List<ChatMessage> message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._messages.AddRange(message);

        if (this._messages.Count == 2)
        {
            var formattedMessages = string.Join(Environment.NewLine,
                this._messages.Select(m => $"{m.AuthorName}: {m.Text}"));
            await context.YieldOutputAsync(formattedMessages, cancellationToken);
        }
    }
}

Langkah 6: Bangun Alur Kerja

Sambungkan pelaksana dan agen dengan menggunakan pola edge fan-out dan fan-in:

        // Build the workflow by adding executors and connecting them
        var workflow = new WorkflowBuilder(startExecutor)
            .AddFanOutEdge(startExecutor, targets: [physicist, chemist])
            .AddFanInEdge(aggregationExecutor, sources: [physicist, chemist])
            .WithOutputFrom(aggregationExecutor)
            .Build();

Langkah 7: Jalankan Alur Kerja

Jalankan alur kerja dan ambil output streaming:

        // Execute the workflow in streaming mode
        await using StreamingRun run = await InProcessExecution.StreamAsync(workflow, "What is temperature?");
        await foreach (WorkflowEvent evt in run.WatchStreamAsync())
        {
            if (evt is WorkflowOutputEvent output)
            {
                Console.WriteLine($"Workflow completed with results:\n{output.Data}");
            }
        }
    }
}

Cara Kerjanya

  1. Fan-Out: ConcurrentStartExecutor menerima pertanyaan masukan dan tepi fan-out mengirimkannya secara simultan ke agen Ahli Fisika dan Ahli Kimia.
  2. Pemrosesan Paralel: Kedua agen AI memproses pertanyaan yang sama secara bersamaan, masing-masing memberikan perspektif ahli mereka.
  3. Fan-In: Mengumpulkan ConcurrentAggregationExecutorChatMessage respons dari kedua agen.
  4. Agregasi: Setelah kedua respons diterima, agregator menggabungkannya ke dalam output yang diformat.

Konsep Utama

  • Fan-Out Edges: Gunakan AddFanOutEdge() untuk mendistribusikan input yang sama ke beberapa pelaksana atau agen.
  • Fan-In Edges: Gunakan AddFanInEdge() untuk mengumpulkan hasil dari beberapa eksekutor sumber.
  • Integrasi Agen AI: Agen AI dapat digunakan langsung sebagai pelaksana dalam alur kerja.
  • Kelas Dasar Eksekutor: Eksekutor kustom mewarisi dari Executor<TInput> dan menimpa metode HandleAsync.
  • Giliran Token: Gunakan TurnToken untuk memberi sinyal kepada agen agar mulai memproses pesan yang diantrekan.
  • Eksekusi Streaming: Gunakan StreamAsync() untuk mendapatkan pembaruan real time saat alur kerja berlangsung.

Implementasi Lengkap

Untuk implementasi kerja lengkap alur kerja bersamaan ini dengan agen AI, lihat sampel Bersamaan/Program.cs di repositori Kerangka Kerja Agen.

Dalam implementasi Python, Anda akan membangun alur kerja bersamaan yang memproses data melalui beberapa eksekutor paralel dan menggabungkan hasil dari berbagai jenis. Contoh ini menunjukkan bagaimana kerangka kerja menangani jenis hasil campuran dari pemrosesan bersamaan.

Apa yang akan Anda Bangun

Anda akan membuat alur kerja yang:

  • Mengambil daftar angka sebagai input
  • Mendistribusikan daftar ke dua eksekutor yang bekerja secara paralel (satu menghitung rata-rata, satu menghitung jumlah)
  • Mengagregasi berbagai jenis hasil (float dan int) ke dalam output akhir
  • Menunjukkan bagaimana kerangka kerja menangani berbagai jenis hasil dari pelaksana bersamaan

Konsep yang Tercakup

Prasyarat

  • Python 3.10 atau yang lebih baru
  • Agent Framework Core terinstal: pip install agent-framework-core --pre

Langkah 1: Impor Dependensi yang Diperlukan

Mulailah dengan mengimpor komponen yang diperlukan dari Agent Framework:

import asyncio
import random

from agent_framework import Executor, WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, handler
from typing_extensions import Never

Langkah 2: Buat Eksekutor Dispatcher

Dispatcher bertanggung jawab untuk mendistribusikan input awal ke beberapa pelaksana paralel:

class Dispatcher(Executor):
    """
    The sole purpose of this executor is to dispatch the input of the workflow to
    other executors.
    """

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[list[int]]):
        if not numbers:
            raise RuntimeError("Input must be a valid list of integers.")

        await ctx.send_message(numbers)

Langkah 3: Buat Pelaksana Pemrosesan Paralel

Buat dua pelaksana yang akan memproses data secara bersamaan:

class Average(Executor):
    """Calculate the average of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[float]):
        average: float = sum(numbers) / len(numbers)
        await ctx.send_message(average)


class Sum(Executor):
    """Calculate the sum of a list of integers."""

    @handler
    async def handle(self, numbers: list[int], ctx: WorkflowContext[int]):
        total: int = sum(numbers)
        await ctx.send_message(total)

Langkah 4: Buat Eksekutor Agregator

Agregator mengumpulkan hasil dari eksekutor paralel dan menghasilkan output akhir:

class Aggregator(Executor):
    """Aggregate the results from the different tasks and yield the final output."""

    @handler
    async def handle(self, results: list[int | float], ctx: WorkflowContext[Never, list[int | float]]):
        """Receive the results from the source executors.

        The framework will automatically collect messages from the source executors
        and deliver them as a list.

        Args:
            results (list[int | float]): execution results from upstream executors.
                The type annotation must be a list of union types that the upstream
                executors will produce.
            ctx (WorkflowContext[Never, list[int | float]]): A workflow context that can yield the final output.
        """
        await ctx.yield_output(results)

Langkah 5: Bangun Alur Kerja

Sambungkan eksekutor menggunakan pola pinggir fan-out dan fan-in:

async def main() -> None:
    # 1) Create the executors
    dispatcher = Dispatcher(id="dispatcher")
    average = Average(id="average")
    summation = Sum(id="summation")
    aggregator = Aggregator(id="aggregator")

    # 2) Build a simple fan out and fan in workflow
    workflow = (
        WorkflowBuilder()
        .set_start_executor(dispatcher)
        .add_fan_out_edges(dispatcher, [average, summation])
        .add_fan_in_edges([average, summation], aggregator)
        .build()
    )

Langkah 6: Jalankan Alur Kerja

Jalankan alur kerja dengan data sampel dan ambil output:

    # 3) Run the workflow
    output: list[int | float] | None = None
    async for event in workflow.run_stream([random.randint(1, 100) for _ in range(10)]):
        if isinstance(event, WorkflowOutputEvent):
            output = event.data

    if output is not None:
        print(output)

if __name__ == "__main__":
    asyncio.run(main())

Cara Kerjanya

  1. Fan-Out: Dispatcher menerima daftar masukan dan mengirimkannya ke eksekutor Average dan Sum secara bersamaan.
  2. Pemrosesan Paralel: Kedua eksekutor memproses input yang sama secara bersamaan, menghasilkan jenis hasil yang berbeda:
    • Average eksekutor float menghasilkan output
    • Sumpelaksana menghasilkan hasil int
  3. Fan-In: Aggregator Menerima hasil dari kedua eksekutor sebagai daftar yang berisi kedua jenis
  4. Penanganan Jenis: Kerangka kerja secara otomatis menangani berbagai jenis hasil menggunakan jenis union (int | float)

Konsep Utama

  • Fan-Out Edges: Gunakan add_fan_out_edges() untuk mengirim input yang sama ke beberapa eksekutor
  • Fan-In Edges: Gunakan add_fan_in_edges() untuk mengumpulkan hasil dari berbagai eksekutor sumber
  • Tipe Union: Menangani jenis hasil yang berbeda menggunakan anotasi tipe seperti list[int | float]
  • Eksekusi Bersamaan: Beberapa eksekutor memproses data secara bersamaan, meningkatkan performa

Implementasi Lengkap

Untuk implementasi kerja lengkap alur kerja bersamaan ini, lihat sampel aggregate_results_of_different_types.py di repositori Kerangka Kerja Agen.

Langkah Selanjutnya