Bagikan melalui


Alur Kerja Kerangka Kerja Agen Microsoft - Permintaan dan Respons

Halaman ini memberikan gambaran umum tentang cara kerja penanganan Permintaan dan Respons dalam sistem Microsoft Agent Framework Workflow.

Gambaran Umum

Pelaksana dalam alur kerja dapat mengirim permintaan ke luar alur kerja dan menunggu respons. Ini berguna untuk skenario di mana eksekutor perlu berinteraksi dengan sistem eksternal, seperti interaksi human-in-the-loop, atau operasi asinkron lainnya.

Mengaktifkan Penanganan Permintaan dan Respons dalam Alur Kerja

Permintaan dan respons ditangani melalui jenis khusus yang disebut InputPort.

// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");

Tambahkan port input ke alur kerja.

var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
    .AddEdge(inputPort, executorA)
    .AddEdge(executorA, inputPort)
    .Build<CustomRequestType>();

Sekarang, karena dalam alur executorA kerja terhubung ke inputPort di kedua arah, executorA harus dapat mengirim permintaan dan menerima respons melalui inputPort. Berikut adalah yang perlu Anda lakukan di dalam SomeExecutor untuk mengirim permintaan dan menerima respons.

internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

Atau, SomeExecutor dapat memisahkan penanganan pengiriman dan respons permintaan menjadi dua penangan.

internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
    protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
    {
        return routeBuilder
            .AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
            .AddHandler<OtherDataType>(this.HandleOtherDataAsync);
    }

    public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
    }

    public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
    {
        // Process the message...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

Pelaksana dapat mengirim permintaan menggunakan ctx.request_info() dan menangani respons dengan @response_handler.

from agent_framework import response_handler, WorkflowBuilder

executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()

executor_a dapat mengirim permintaan dan menerima respons secara langsung menggunakan kemampuan bawaan.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    response_handler,
)

class SomeExecutor(Executor):

    @handler
    async def handle_data(
        self,
        data: OtherDataType,
        context: WorkflowContext,
    ):
        # Process the message...
        ...
        # Send a request using the API
        await context.request_info(
            request_data=CustomRequestType(...),
            response_type=CustomResponseType
        )

    @response_handler
    async def handle_response(
        self,
        original_request: CustomRequestType,
        response: CustomResponseType,
        context: WorkflowContext,
    ):
        # Process the response...
        ...

Dekorator @response_handler secara otomatis mendaftarkan metode untuk menangani respons untuk jenis permintaan dan respons yang ditentukan.

Menangani Permintaan dan Respons

Sebuah InputPort mengirimkan RequestInfoEvent ketika menerima permintaan. Anda dapat berlangganan event ini untuk menangani permintaan masuk dari alur kerja. Saat Anda menerima respons dari sistem eksternal, kirim kembali ke alur kerja menggunakan mekanisme respons. Kerangka kerja secara otomatis merutekan respons ke pelaksana yang mengirim permintaan asli.

StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
            await handle.SendResponseAsync(response).ConfigureAwait(false);
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            // The workflow has completed successfully
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

Pelaksana dapat mengirim permintaan secara langsung tanpa memerlukan komponen terpisah. Ketika pelaksana memanggil ctx.request_info(), alur kerja memancarkan RequestInfoEvent. Anda dapat berlangganan event ini untuk menangani permintaan masuk dari alur kerja. Saat Anda menerima respons dari sistem eksternal, kirim kembali ke alur kerja menggunakan mekanisme respons. Kerangka kerja secara otomatis merutekan respons ke metode pelaksana @response_handler .

from agent_framework import RequestInfoEvent

while True:
    request_info_events : list[RequestInfoEvent] = []
    pending_responses : dict[str, CustomResponseType] = {}

    stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)

    async for event in stream:
        if isinstance(event, RequestInfoEvent):
            # Handle `RequestInfoEvent` from the workflow
            request_info_events.append(event)

    if not request_info_events:
        break

    for request_info_event in request_info_events:
        # Handle `RequestInfoEvent` from the workflow
        response = CustomResponseType(...)
        pending_responses[request_info_event.request_id] = response

Titik Pemeriksaan dan Permintaan

Untuk mempelajari selengkapnya tentang titik pemeriksaan, lihat Titik pemeriksaan.

Saat titik pemeriksaan dibuat, permintaan yang tertunda juga disimpan sebagai bagian dari status titik pemeriksaan. Saat Anda memulihkan dari titik pemeriksaan, setiap permintaan yang tertunda akan dikirim ulang sebagai RequestInfoEvent objek, memungkinkan Anda untuk menangkap dan merespons permintaan tersebut. Anda tidak dapat memberikan respons secara langsung selama operasi resume - sebagai gantinya, Anda harus mendengarkan peristiwa yang dipancarkan ulang dan merespons menggunakan mekanisme respons standar.

Langkah Selanjutnya