Notatka
Dostęp do tej strony wymaga autoryzacji. Może spróbować zalogować się lub zmienić katalogi.
Dostęp do tej strony wymaga autoryzacji. Możesz spróbować zmienić katalogi.
Ta strona zawiera omówienie interakcji człowiek-w-pętli (HITL) w systemie przepływu pracy systemu Microsoft Agent Framework. Funkcja HITL jest osiągana za pomocą mechanizmu obsługi żądań i odpowiedzi w przepływach pracy, który umożliwia wykonawcom wysyłanie żądań do systemów zewnętrznych (takich jak operatorzy ludzki) i oczekiwanie na odpowiedzi przed kontynuowaniem wykonywania przepływu pracy.
Przegląd
Wykonawcy w przepływie pracy mogą wysyłać żądania na zewnątrz przepływu pracy i czekać na odpowiedzi. Jest to przydatne w przypadku scenariuszy, w których funkcja wykonawcza musi wchodzić w interakcje z systemami zewnętrznymi, takimi jak interakcje typu human-in-the-loop lub inne operacje asynchroniczne.
Utwórzmy przepływ pracy, który pyta ludzkiego operatora o odgadnięcie liczby i używa wykonawcy, aby ocenić, czy odgadnięcie jest prawidłowe.
Włączanie obsługi żądań i odpowiedzi w przepływie pracy
Żądania i odpowiedzi są obsługiwane za pośrednictwem specjalnego typu o nazwie RequestPort.
A RequestPort to kanał komunikacyjny, który umożliwia wykonawcom wysyłanie żądań i odbieranie odpowiedzi. Gdy wykonawca wysyła komunikat do RequestPortobiektu, port żądania emituje RequestInfoEvent, który zawiera szczegóły żądania. Systemy zewnętrzne mogą nasłuchiwać tych zdarzeń, przetwarzać żądania i wysyłać odpowiedzi z powrotem do przepływu pracy. Platforma automatycznie kieruje odpowiedzi z powrotem do odpowiedniego wykonawcy na podstawie oryginalnego żądania.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Dodaj port wejściowy do przepływu pracy.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Definicja JudgeExecutor musi mieć liczbę docelową i być w stanie ocenić, czy odgadnięcie jest poprawne. Jeśli to nie jest poprawne, wyśle kolejne żądanie, aby poprosić o nowe odgadnięcie za pośrednictwem .RequestPort
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
W języku Python, wykonawcy wysyłają żądania przy użyciu ctx.request_info() i obsługują odpowiedzi przy użyciu dekoratora @response_handler.
Utwórzmy przepływ pracy, który pyta ludzkiego operatora o odgadnięcie liczby i używa wykonawcy, aby ocenić, czy odgadnięcie jest prawidłowe.
Włączanie obsługi żądań i odpowiedzi w przepływie pracy
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Dekorator @response_handler automatycznie rejestruje metodę do obsługi odpowiedzi dla określonych typów żądań i odpowiedzi. Framework dopasowuje przychodzące odpowiedzi do poprawnego programu obsługi w oparciu o adnotacje typu parametrów original_request i response.
Obsługa żądań i odpowiedzi
Element RequestPort emituje RequestInfoEvent gdy otrzyma żądanie. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź do funkcji wykonawczej, która wysłała oryginalne żądanie.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Tip
Zapoznaj się z pełnym przykładem kompletnego projektu możliwego do uruchomienia.
Funkcje wykonawcze mogą wysyłać żądania bezpośrednio bez konieczności użycia oddzielnego składnika. Gdy wykonawca wywołuje ctx.request_info(), przepływ pracy emituje element WorkflowEvent z parametrem type == "request_info". Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź na metodę wykonawcy @response_handler .
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Tip
Zobacz ten pełny przykład , aby uzyskać pełny plik, który można uruchomić.
Punkty kontrolne i żądania
Aby dowiedzieć się więcej na temat punktów kontrolnych, zobacz Punkty kontrolne.
Po utworzeniu punktu kontrolnego oczekujące żądania są również zapisywane jako część stanu punktu kontrolnego. Po przywróceniu z punktu kontrolnego wszystkie oczekujące żądania będą wysyłane ponownie jako RequestInfoEvent obiekty, co umożliwia przechwytywanie i reagowanie na nie. Nie można podać odpowiedzi bezpośrednio podczas operacji wznawiania — zamiast tego należy nasłuchiwać ponownie emitowanych zdarzeń i reagować przy użyciu standardowego mechanizmu odpowiedzi.