Microsoft Agent Framework İş Akışları - İnsan Katılımıyla (HITL)

Bu sayfa, Microsoft Agent Framework İş Akışı sistemindeki Döngüdeki İnsan (HITL) etkileşimlerine genel bir bakış sağlar. HITL, yürütücülerin dış sistemlere (insan işleçler gibi) istek göndermesine ve iş akışı yürütmeye devam etmeden önce yanıtlarını beklemesine olanak tanıyan iş akışlarında istek ve yanıt işleme mekanizması aracılığıyla elde edilir.

Genel bakış

bir iş akışındaki yürütücüler iş akışının dışına istek gönderebilir ve yanıtları bekleyebilir. Bu, yürütücünün döngüdeki insan etkileşimleri veya diğer zaman uyumsuz işlemler gibi dış sistemlerle etkileşim kurması gereken senaryolar için kullanışlıdır.

Şimdi bir insan operatörden bir sayıyı tahmin etmelerini isteyen ve tahminin doğru olup olmadığını yargılamak için yürütücü kullanan bir iş akışı oluşturalım.

İş Akışında İstek ve Yanıt İşlemeyi Etkinleştirme

İstekler ve yanıtlar adlı RequestPortözel bir tür aracılığıyla işlenir.

A RequestPort , yürütücülerin istek göndermesine ve yanıt almasına olanak tanıyan bir iletişim kanalıdır. Yürütücü bir RequestPort öğesine ileti gönderdiği zaman, talep portu isteğin ayrıntılarını içeren bir RequestInfoEvent yayar. Dış sistemler bu olayları dinleyebilir, istekleri işleyebilir ve yanıtları iş akışına geri gönderebilir. Çerçeve, yanıtları özgün isteğe göre otomatik olarak uygun yürütücüye yönlendirir.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Giriş bağlantı noktasını bir iş akışına ekleyin.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

tanımının JudgeExecutor bir hedef sayıya ihtiyacı vardır ve tahminin doğru olup olmadığını değerlendirebilmesi gerekir. Doğru değilse, RequestPort aracılığıyla yeni bir tahmin istemek için başka bir istek gönderir.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

Python'da yürütücüler, ctx.request_info() kullanarak istek gönderir ve @response_handler dekoratörünü kullanarak yanıtları işler.

Şimdi bir insan operatörden bir sayıyı tahmin etmelerini isteyen ve tahminin doğru olup olmadığını yargılamak için yürütücü kullanan bir iş akışı oluşturalım.

İş Akışında İstek ve Yanıt İşlemeyi Etkinleştirme

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

Dekoratör, @response_handler belirtilen istek ve yanıt türlerine yönelik yanıtları işlemek için yöntemi otomatik olarak kaydeder. Çerçeve, original_request ve response parametrelerinin tür ek açıklamalarına göre gelen yanıtları doğru işleyiciyle eşleştirir.

İstekleri ve Yanıtları İşleme

Bir RequestPort, bir istek aldığında bir RequestInfoEvent yayar. İş akışından gelen istekleri işlemek için bu olaylara abone olabilirsiniz. Bir dış sistemden yanıt aldığınızda, yanıt mekanizmasını kullanarak bu yanıtı iş akışına geri gönderin. Çerçeve, yanıtı otomatik olarak özgün isteği gönderen yürütücüye yönlendirir.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Tip

Çalıştırılabilir projenin tamamı için tam örneğe bakın.

Yürütücüler ayrı bir bileşene gerek kalmadan doğrudan istek gönderebilir. Bir yürütücü ctx.request_info() çağırdığında, iş akışı WorkflowEvent ile bir type == "request_info" yayar. İş akışından gelen istekleri işlemek için bu olaylara abone olabilirsiniz. Bir dış sistemden yanıt aldığınızda, yanıt mekanizmasını kullanarak bu yanıtı iş akışına geri gönderin. Çerçeve, yanıtı yürütücüsü @response_handler yöntemine otomatik olarak yönlendirir.

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Tip

Tam bir çalıştırılabilir dosya için bu tam örneğe bakın.

Döngüde İnsan ile Ajan Düzenlemeleri

RequestPort Yukarıda açıklanan desen özel yürütücüler ve WorkflowBuilderile çalışır. Aracı orkestrasyonları (sıralı, eşzamanlı veya grup sohbeti iş akışları gibi) kullanırken, insanı döngüye dahil etme isteği/yanıt mekanizması aracılığıyla araç onayı sağlanır.

Aracılar yürütmeden önce insan onayı gerektiren araçları kullanabilir. Aracı, onay gerektiren bir aracı çağırmaya çalıştığında, iş akışı duraklatılır ve desen ile aynı şekilde bir RequestInfoEvent yayar, ancak olay yükü özel istek türü yerine bir ToolApprovalRequestContent (C#) veya Content ile type == "function_approval_request" (Python) içerir.

Denetim Noktaları ve İstekler

Denetim noktaları hakkında daha fazla bilgi edinmek için bkz. Denetim noktaları.

Bir denetim noktası oluşturulduğunda, bekleyen istekler de denetim noktası durumunun bir parçası olarak kaydedilir. Bir denetim noktasından geri yükleme yaptığınızda, bekleyen istekler yeniden nesne halinde RequestInfoEvent yayımlanarak bunları yakalayabilmenizi ve onlara yanıt verebilmenizi sağlar. Sürdürme işlemi sırasında yanıtları doğrudan sağlayamazsınız; bunun yerine, yeniden yayılan olayları dinlemeniz ve standart yanıt mekanizmasını kullanarak yanıt vermeniz gerekir.

Sonraki Adımlar