Sdílet prostřednictvím


Pracovní postupy rozhraní Microsoft Agent Framework – human-in-the-loop (HITL)

Tato stránka obsahuje přehled interakcí HITL (Human-in-the-loop) v systému pracovních postupů rozhraní Microsoft Agent Framework. HITL se dosahuje prostřednictvím mechanismu zpracování požadavků a odpovědí v pracovních postupech, který exekutorům umožňuje odesílat žádosti do externích systémů (jako jsou lidské operátory) a čekat na odpovědi, než budou pokračovat v provádění pracovního postupu.

Přehled

Exekutory v pracovním postupu můžou odesílat požadavky mimo pracovní postup a čekat na odpovědi. To je užitečné ve scénářích, kdy exekutor potřebuje pracovat s externími systémy, jako jsou interakce mezi lidmi ve smyčce nebo jakékoli jiné asynchronní operace.

Pojďme vytvořit pracovní postup, který požádá operátora člověka, aby odhadl číslo a pomocí exekutoru posoudil, jestli je odhad správný.

Povolení zpracování požadavků a odpovědí v pracovním postupu

Požadavky a odpovědi se zpracovávají prostřednictvím speciálního typu volaný RequestPort.

A RequestPort je komunikační kanál, který exekutorům umožňuje odesílat žádosti a přijímat odpovědi. Když exekutor odešle zprávu na adresu RequestPort, port požadavku vygeneruje RequestInfoEvent , který obsahuje podrobnosti požadavku. Externí systémy mohou naslouchat těmto událostem, zpracovávat požadavky a odesílat odpovědi zpět do pracovního postupu. Architektura automaticky směruje odpovědi zpět do příslušného exekutoru na základě původního požadavku.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Přidejte vstupní port do pracovního postupu.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

Definice JudgeExecutor potřebuje cílové číslo a být schopná posoudit, jestli je odhad správný. Pokud není správný, odešle další žádost o nový odhad prostřednictvím RequestPort.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

V Pythonu vykonavatelé odesílají požadavky pomocí ctx.request_info() a zpracovávají odpovědi pomocí dekorátoru @response_handler.

Pojďme vytvořit pracovní postup, který požádá operátora člověka, aby odhadl číslo a pomocí exekutoru posoudil, jestli je odhad správný.

Povolení zpracování požadavků a odpovědí v pracovním postupu

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

Dekorátor @response_handler automaticky zaregistruje metodu pro zpracování odpovědí pro zadané typy požadavků a odpovědí. Rámec přiřazuje příchozí odpovědi ke správné obslužné rutině na základě typových anotací parametrů original_request a response.

Zpracování žádostí a odpovědí

RequestPort vyšle RequestInfoEvent při obdržení požadavku. Můžete se přihlásit k odběru těchto událostí, abyste mohli zpracovávat příchozí požadavky z pracovního postupu. Když obdržíte odpověď z externího systému, odešlete ji zpět do pracovního postupu pomocí mechanismu odpovědi. Architektura automaticky směruje odpověď na exekutor, který odeslal původní požadavek.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Návod

Podívejte se na úplnou ukázku kompletního runnable projektu.

Exekutory můžou odesílat požadavky přímo bez nutnosti samostatné komponenty. Při volání exekutorem ctx.request_info() pracovní postup vygeneruje WorkflowEvent s type == "request_info". Můžete se přihlásit k odběru těchto událostí, abyste mohli zpracovávat příchozí požadavky z pracovního postupu. Když obdržíte odpověď z externího systému, odešlete ji zpět do pracovního postupu pomocí mechanismu odpovědi. Architektura automaticky směruje odpověď na metodu exekutoru @response_handler .

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Návod

Podívejte se na tuto úplnou ukázku kompletního spustitelného souboru.

Člověk ve smyčce s orchestrací agentů

Výše popsaný vzorec funguje s vlastními vykonavateli a WorkflowBuilder. Při použití orchestrací agentů (například sekvenčních, souběžných nebo skupinových chatovacích pracovních postupů) se schválení nástroje dosahuje prostřednictvím mechanismu požadavku a odpovědi s člověkem v centru dění.

Agenti můžou používat nástroje, které před provedením vyžadují schválení člověkem. Když se agent pokusí volat nástroj potřebný ke schválení, pracovní postup se pozastaví a vygeneruje RequestInfoEvent stejně jako vzorek RequestPort , ale datová část události obsahuje ToolApprovalRequestContent (C#) nebo Content s type == "function_approval_request" (Python) místo vlastního typu požadavku.

Kontrolní body a požadavky

Další informace o kontrolních bodech najdete v tématu Kontrolní body.

Při vytvoření kontrolního bodu se čekající požadavky uloží také jako součást stavu kontrolního bodu. Když provedete obnovení z kontrolního bodu, všechny čekající požadavky se znovu vygenerují jako RequestInfoEvent objekty, což vám umožní zachytit je a reagovat na ně. Odpovědi nelze zadat přímo během operace obnovení – místo toho je nutné naslouchat znovu vygenerovaným událostem a reagovat pomocí standardního mechanismu odpovědi.

Další kroky