Poznámka:
Přístup k této stránce vyžaduje autorizaci. Můžete se zkusit přihlásit nebo změnit adresáře.
Přístup k této stránce vyžaduje autorizaci. Můžete zkusit změnit adresáře.
Tato stránka obsahuje přehled interakcí HITL (Human-in-the-loop) v systému pracovních postupů rozhraní Microsoft Agent Framework. HITL se dosahuje prostřednictvím mechanismu zpracování požadavků a odpovědí v pracovních postupech, který exekutorům umožňuje odesílat žádosti do externích systémů (jako jsou lidské operátory) a čekat na odpovědi, než budou pokračovat v provádění pracovního postupu.
Přehled
Exekutory v pracovním postupu můžou odesílat požadavky mimo pracovní postup a čekat na odpovědi. To je užitečné ve scénářích, kdy exekutor potřebuje pracovat s externími systémy, jako jsou interakce mezi lidmi ve smyčce nebo jakékoli jiné asynchronní operace.
Pojďme vytvořit pracovní postup, který požádá operátora člověka, aby odhadl číslo a pomocí exekutoru posoudil, jestli je odhad správný.
Povolení zpracování požadavků a odpovědí v pracovním postupu
Požadavky a odpovědi se zpracovávají prostřednictvím speciálního typu volaný RequestPort.
A RequestPort je komunikační kanál, který exekutorům umožňuje odesílat žádosti a přijímat odpovědi. Když exekutor odešle zprávu na adresu RequestPort, port požadavku vygeneruje RequestInfoEvent , který obsahuje podrobnosti požadavku. Externí systémy mohou naslouchat těmto událostem, zpracovávat požadavky a odesílat odpovědi zpět do pracovního postupu. Architektura automaticky směruje odpovědi zpět do příslušného exekutoru na základě původního požadavku.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Přidejte vstupní port do pracovního postupu.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Definice JudgeExecutor potřebuje cílové číslo a být schopná posoudit, jestli je odhad správný. Pokud není správný, odešle další žádost o nový odhad prostřednictvím RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
V Pythonu vykonavatelé odesílají požadavky pomocí ctx.request_info() a zpracovávají odpovědi pomocí dekorátoru @response_handler.
Pojďme vytvořit pracovní postup, který požádá operátora člověka, aby odhadl číslo a pomocí exekutoru posoudil, jestli je odhad správný.
Povolení zpracování požadavků a odpovědí v pracovním postupu
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Dekorátor @response_handler automaticky zaregistruje metodu pro zpracování odpovědí pro zadané typy požadavků a odpovědí. Rámec přiřazuje příchozí odpovědi ke správné obslužné rutině na základě typových anotací parametrů original_request a response.
Zpracování žádostí a odpovědí
RequestPort vyšle RequestInfoEvent při obdržení požadavku. Můžete se přihlásit k odběru těchto událostí, abyste mohli zpracovávat příchozí požadavky z pracovního postupu. Když obdržíte odpověď z externího systému, odešlete ji zpět do pracovního postupu pomocí mechanismu odpovědi. Architektura automaticky směruje odpověď na exekutor, který odeslal původní požadavek.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Návod
Podívejte se na úplnou ukázku kompletního runnable projektu.
Exekutory můžou odesílat požadavky přímo bez nutnosti samostatné komponenty. Při volání exekutorem ctx.request_info() pracovní postup vygeneruje WorkflowEvent s type == "request_info". Můžete se přihlásit k odběru těchto událostí, abyste mohli zpracovávat příchozí požadavky z pracovního postupu. Když obdržíte odpověď z externího systému, odešlete ji zpět do pracovního postupu pomocí mechanismu odpovědi. Architektura automaticky směruje odpověď na metodu exekutoru @response_handler .
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Návod
Podívejte se na tuto úplnou ukázku kompletního spustitelného souboru.
Člověk ve smyčce s orchestrací agentů
Výše popsaný vzorec funguje s vlastními vykonavateli a WorkflowBuilder. Při použití orchestrací agentů (například sekvenčních, souběžných nebo skupinových chatovacích pracovních postupů) se schválení nástroje dosahuje prostřednictvím mechanismu požadavku a odpovědi s člověkem v centru dění.
Agenti můžou používat nástroje, které před provedením vyžadují schválení člověkem. Když se agent pokusí volat nástroj potřebný ke schválení, pracovní postup se pozastaví a vygeneruje RequestInfoEvent stejně jako vzorek RequestPort , ale datová část události obsahuje ToolApprovalRequestContent (C#) nebo Content s type == "function_approval_request" (Python) místo vlastního typu požadavku.
Návod
Kompletní příklady s kódem najdete tady:
Kontrolní body a požadavky
Další informace o kontrolních bodech najdete v tématu Kontrolní body.
Při vytvoření kontrolního bodu se čekající požadavky uloží také jako součást stavu kontrolního bodu. Když provedete obnovení z kontrolního bodu, všechny čekající požadavky se znovu vygenerují jako RequestInfoEvent objekty, což vám umožní zachytit je a reagovat na ně. Odpovědi nelze zadat přímo během operace obnovení – místo toho je nutné naslouchat znovu vygenerovaným událostem a reagovat pomocí standardního mechanismu odpovědi.