Bagikan melalui


Menangani Permintaan dan Respons dalam Alur Kerja

Tutorial ini menunjukkan cara menangani permintaan dan respons dalam alur kerja menggunakan Alur Kerja Kerangka Kerja Agen. Anda akan mempelajari cara membuat alur kerja interaktif yang dapat menjeda eksekusi untuk meminta input dari sumber eksternal (seperti manusia atau sistem lain) lalu melanjutkan setelah respons disediakan.

Konsep yang Tercakup

Di .NET, alur kerja human-in-the-loop menggunakan RequestPort dan penanganan permintaan eksternal untuk menjeda eksekusi dan mengumpulkan input pengguna. Pola ini memungkinkan alur kerja interaktif di mana sistem dapat meminta informasi dari sumber eksternal selama eksekusi.

Prasyarat

Menginstal paket NuGet

Pertama, instal paket yang diperlukan untuk proyek .NET Anda:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Komponen Utama

RequestPort dan Permintaan Eksternal

RequestPort bertindak sebagai jembatan antara alur kerja dan sumber input eksternal. Ketika alur kerja membutuhkan input, alur kerja menghasilkan RequestInfoEvent yang ditangani aplikasi Anda:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Jenis Sinyal

Tentukan jenis sinyal untuk mengomunikasikan berbagai jenis permintaan:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Pelaksana Alur Kerja

Buat pelaksana yang memproses input pengguna dan berikan umpan balik:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Membangun Alur Kerja

Sambungkan RequestPort dan eksekutor dalam perulangan umpan balik:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Menjalankan Alur Kerja Interaktif

Tangani permintaan eksternal selama eksekusi alur kerja:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Penanganan Permintaan

Proses berbagai jenis permintaan input:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Konsep Implementasi

Alur Peristiwa PermintaanInfo

  1. Eksekusi Alur Kerja: Alur kerja memproses hingga memerlukan input eksternal
  2. Pembuatan Permintaan: RequestPort menghasilkan RequestInfoEvent dengan detail permintaan
  3. Penanganan Eksternal: Aplikasi Anda menangkap peristiwa dan mengumpulkan input pengguna
  4. Pengiriman Respons: Kirim ExternalResponse kembali untuk melanjutkan alur kerja
  5. Dimulainya Kembali Alur Kerja: Alur kerja terus diproses dengan input yang disediakan

Siklus Hidup Alur Kerja

  • Eksekusi Streaming: Gunakan StreamAsync untuk memantau peristiwa secara real-time
  • Penanganan Peristiwa: Proses RequestInfoEvent untuk permintaan input dan WorkflowOutputEvent untuk penyelesaian
  • Koordinasi Respons: Mencocokkan respons terhadap permintaan menggunakan mekanisme penanganan respons alur kerja

Alur Implementasi

  1. Inisialisasi Alur Kerja: Alur kerja dimulai dengan mengirim NumberSignal.Init ke RequestPort.

  2. Pembuatan Permintaan: RequestPort menghasilkan RequestInfoEvent tebakan awal yang diminta dari pengguna.

  3. Jeda Alur Kerja: Alur kerja menjeda dan menunggu input eksternal saat aplikasi menangani permintaan.

  4. Respons Manusia: Aplikasi eksternal mengumpulkan input pengguna dan mengirim ExternalResponse kembali ke alur kerja.

  5. Pemrosesan dan Umpan Balik: Memproses JudgeExecutor tebakan dan menyelesaikan alur kerja atau mengirim sinyal baru (Di Atas/Di Bawah) untuk meminta tebakan lain.

  6. Berlanjutnya Perulangan: Proses berlanjut sampai angka yang benar ditebak.

Manfaat Kerangka Kerja

  • Keamanan Jenis: Pengetikan yang kuat memastikan kontrak respons permintaan dipertahankan
  • Berbasis Peristiwa: Sistem peristiwa yang kaya memberikan visibilitas ke dalam eksekusi alur kerja
  • Eksekusi yang Dapat Dijeda: Alur kerja dapat menjeda tanpa batas waktu saat menunggu input eksternal
  • Manajemen Status: Status alur kerja dipertahankan di seluruh siklus jeda-resume
  • Integrasi Fleksibel: RequestPorts dapat diintegrasikan dengan sumber input eksternal apa pun (UI, API, konsol, dll.)

Sampel Lengkap

Untuk implementasi kerja lengkap, lihat sampel Human-in-the-Loop Basic.

Pola ini memungkinkan pembangunan aplikasi interaktif canggih di mana pengguna dapat memberikan input pada titik keputusan utama dalam alur kerja otomatis.

Apa yang akan Anda Bangun

Anda akan membuat alur kerja permainan tebakan angka interaktif yang menunjukkan pola respons permintaan:

  • Agen AI yang membuat tebakan cerdas
  • Pelaksana yang dapat langsung mengirim permintaan menggunakan request_info API
  • Manajer giliran yang berkoordinasi antara agen dan interaksi manusia menggunakan @response_handler
  • Input/output konsol interaktif untuk umpan balik waktu nyata

Prasyarat

  • Python 3.10 atau yang lebih baru
  • Penyebaran Azure OpenAI telah dikonfigurasi
  • Konfigurasi autentikasi Azure CLI telah berhasil dilakukan (az login)
  • Pemahaman dasar tentang pemrograman asinkron Python

Konsep Utama

Kemampuan Permintaan dan Respons

Pelaksana memiliki kemampuan permintaan dan respons bawaan yang memungkinkan interaksi dengan keterlibatan manusia (human-in-the-loop):

  • Hubungi ctx.request_info(request_data=request_data, response_type=response_type) untuk mengirimkan permintaan
  • @response_handler Gunakan dekorator untuk menangani respons
  • Menentukan jenis permintaan/respons kustom tanpa persyaratan pewarisan

Aliran Permintaan-Respons

Pelaksana dapat mengirim permintaan secara langsung menggunakan ctx.request_info() dan menangani respons menggunakan @response_handler dekorator:

  1. Panggilan eksekutor ctx.request_info(request_data=request_data, response_type=response_type)
  2. Alur kerja menghasilkan RequestInfoEvent dengan data permintaan
  3. Sistem eksternal (manusia, API, dll.) memproses permintaan
  4. Respons dikirim kembali melalui send_responses_streaming()
  5. Alur kerja dilanjutkan dan memberikan respons terhadap metode pelaksana @response_handler

Menyiapkan Lingkungan

Pertama, instal paket yang diperlukan:

pip install agent-framework-core --pre
pip install azure-identity

Tentukan Model Permintaan dan Respons

Mulailah dengan menentukan struktur data untuk komunikasi respons permintaan:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

HumanFeedbackRequest adalah klasifikasi data sederhana untuk payload permintaan terstruktur:

  • Pengetikan ketat untuk muatan data permintaan
  • Validasi yang kompatibel dengan versi mendatang
  • Perjelas semantik korelasi pada respons
  • Bidang kontekstual (seperti tebakan sebelumnya) untuk perintah UI yang kaya

Membuat Turn Manager

Manajer giliran mengoordinasikan aliran antara agen AI dan manusia:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Membangun Alur Kerja

Buat alur kerja utama yang menyambungkan semua komponen:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.as_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("πŸ”„ State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\nπŸ€– {prompt}")
                answer = input("πŸ‘€ Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("πŸ‘‹ Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\nπŸŽ‰ {workflow_output}")

Menjalankan Contoh Program

Untuk implementasi kerja lengkap, lihat sampel Human-in-the-Loop Guessing Game.

Cara Kerjanya

  1. Inisialisasi Alur Kerja: Alur kerja dimulai dengan TurnManager meminta tebakan awal dari agen AI.

  2. Respons Agen: Agen AI membuat tebakan dan mengembalikan JSON terstruktur, yang mengalir kembali ke TurnManager.

  3. Permintaan Manusia: TurnManager memproses tebakan agen dan memanggil ctx.request_info() dengan HumanFeedbackRequest.

  4. Jeda Alur Kerja: Alur kerja memancarkan RequestInfoEvent dan berlanjut hingga tidak ada tindakan lebih lanjut yang dapat diambil, lalu menunggu input manusia.

  5. Respons Manusia: Aplikasi eksternal mengumpulkan input manusia dan mengirim respons kembali menggunakan send_responses_streaming().

  6. Lanjutkan dan Teruskan: Alur kerja dilanjutkan, metode TurnManager dari @response_handler memproses umpan balik manusia, dan mengakhiri permainan atau mengirim permintaan lain ke agen.

Manfaat Utama

  • Komunikasi Terstruktur: Model permintaan dan respons jenis aman mencegah kesalahan runtime
  • Korelasi: ID permintaan memastikan respons cocok dengan permintaan yang benar
  • Eksekusi yang Dapat Dijeda: Alur kerja dapat menjeda tanpa batas waktu saat menunggu input eksternal
  • Preservasi Status: Status alur kerja dipertahankan di seluruh siklus jeda-resume
  • Berbasis Peristiwa: Sistem peristiwa yang kaya memberikan visibilitas ke dalam status alur kerja dan transisi

Pola ini memungkinkan pembangunan aplikasi interaktif canggih di mana agen AI dan manusia berkolaborasi dengan mulus dalam alur kerja terstruktur.

Langkah Selanjutnya