Alur Kerja Kerangka Kerja Agen Microsoft - Human-in-the-loop (HITL)

Halaman ini memberikan gambaran umum interaksi Human-in-the-loop (HITL) dalam sistem Alur Kerja Kerangka Kerja Agen Microsoft. HITL dicapai melalui mekanisme penanganan permintaan dan respons dalam alur kerja, yang memungkinkan pelaksana mengirim permintaan ke sistem eksternal (seperti operator manusia) dan menunggu respons mereka sebelum melanjutkan eksekusi alur kerja.

Ikhtisar

Pelaksana dalam alur kerja dapat mengirim permintaan ke luar alur kerja dan menunggu respons. Ini berguna untuk skenario di mana eksekutor perlu berinteraksi dengan sistem eksternal, seperti interaksi human-in-the-loop, atau operasi asinkron lainnya.

Mari kita buat alur kerja yang meminta operator manusia untuk menebak angka dan menggunakan pelaksana untuk menilai apakah tebakan tersebut benar.

Mengaktifkan Penanganan Permintaan dan Respons dalam Alur Kerja

Permintaan dan respons ditangani melalui jenis khusus yang disebut RequestPort.

RequestPort adalah saluran komunikasi yang memungkinkan pelaksana mengirim permintaan dan menerima respons. Ketika eksekutor mengirim pesan ke RequestPort, port permintaan memancarkan RequestInfoEvent yang berisi detail permintaan. Sistem eksternal dapat mendengarkan peristiwa ini, memproses permintaan, dan mengirim respons kembali ke alur kerja. Kerangka kerja secara otomatis merutekan respons kembali ke pelaksana yang sesuai berdasarkan permintaan asli.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Tambahkan port input ke alur kerja.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

Definisi JudgeExecutor membutuhkan nomor target dan dapat menilai apakah tebakan tersebut benar. Jika tidak benar, itu akan mengirim permintaan lain melalui RequestPort untuk meminta tebakan baru.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

Di Python, eksekutor mengirim permintaan menggunakan ctx.request_info() dan menangani respons dengan @response_handler dekorator.

Mari kita buat alur kerja yang meminta operator manusia untuk menebak angka dan menggunakan pelaksana untuk menilai apakah tebakan tersebut benar.

Mengaktifkan Penanganan Permintaan dan Respons dalam Alur Kerja

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

Dekorator @response_handler secara otomatis mendaftarkan metode untuk menangani respons untuk jenis permintaan dan respons yang ditentukan. Kerangka kerja mencocokkan respons masuk ke pengelola yang benar berdasarkan jenis anotasi dari original_request dan response parameter.

Menangani Permintaan dan Respons

Sebuah RequestPort mengirimkan RequestInfoEvent ketika menerima permintaan. Anda dapat berlangganan event ini untuk menangani permintaan masuk dari alur kerja. Saat Anda menerima respons dari sistem eksternal, kirim kembali ke alur kerja menggunakan mekanisme respons. Kerangka kerja secara otomatis merutekan respons ke pelaksana yang mengirim permintaan asli.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Petunjuk / Saran

Lihat sampel lengkap untuk proyek lengkap yang dapat dijalankan.

Pelaksana dapat mengirim permintaan secara langsung tanpa memerlukan komponen terpisah. Saat pelaksana memanggil ctx.request_info(), alur kerja menghasilkan WorkflowEvent dengan type == "request_info". Anda dapat berlangganan event ini untuk menangani permintaan masuk dari alur kerja. Saat Anda menerima respons dari sistem eksternal, kirim kembali ke alur kerja menggunakan mekanisme respons. Kerangka kerja secara otomatis merutekan respons ke metode pelaksana @response_handler .

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Petunjuk / Saran

Lihat sampel lengkap ini untuk file lengkap yang dapat dijalankan.

Human-in-the-Loop dengan Orkestrasi Agen

Pola yang RequestPort dijelaskan di atas berfungsi dengan eksekutor kustom dan WorkflowBuilder. Saat menggunakan orkestrasi agen (seperti alur yang berurutan, bersamaan, atau obrolan grup), persetujuan alat dicapai melalui mekanisme permintaan/respons manusia-dalam-proses.

Agen dapat menggunakan alat yang memerlukan persetujuan manusia sebelum eksekusi. Ketika agen mencoba memanggil alat yang memerlukan persetujuan, alur kerja menjeda dan menghasilkan RequestInfoEvent seperti pada pola RequestPort, tetapi muatan acara berisi ToolApprovalRequestContent (C#) atau Content dengan type == "function_approval_request" (Python) alih-alih jenis permintaan kustom.

Titik Pemeriksaan dan Permintaan

Untuk mempelajari selengkapnya tentang titik pemeriksaan, lihat Titik pemeriksaan.

Saat titik pemeriksaan dibuat, permintaan yang tertunda juga disimpan sebagai bagian dari status titik pemeriksaan. Saat Anda memulihkan dari titik pemeriksaan, setiap permintaan yang tertunda akan dikirim ulang sebagai RequestInfoEvent objek, memungkinkan Anda untuk menangkap dan merespons permintaan tersebut. Anda tidak dapat memberikan respons secara langsung selama operasi resume - sebagai gantinya, Anda harus mendengarkan peristiwa yang dipancarkan ulang dan merespons menggunakan mekanisme respons standar.

Langkah Selanjutnya