Udostępnij za pośrednictwem


Przepływy pracy programu Microsoft Agent Framework — human-in-the-loop (HITL)

Ta strona zawiera omówienie interakcji człowiek-w-pętli (HITL) w systemie przepływu pracy systemu Microsoft Agent Framework. Funkcja HITL jest osiągana za pomocą mechanizmu obsługi żądań i odpowiedzi w przepływach pracy, który umożliwia wykonawcom wysyłanie żądań do systemów zewnętrznych (takich jak operatorzy ludzki) i oczekiwanie na odpowiedzi przed kontynuowaniem wykonywania przepływu pracy.

Przegląd

Wykonawcy w przepływie pracy mogą wysyłać żądania na zewnątrz przepływu pracy i czekać na odpowiedzi. Jest to przydatne w przypadku scenariuszy, w których funkcja wykonawcza musi wchodzić w interakcje z systemami zewnętrznymi, takimi jak interakcje typu human-in-the-loop lub inne operacje asynchroniczne.

Utwórzmy przepływ pracy, który pyta ludzkiego operatora o odgadnięcie liczby i używa wykonawcy, aby ocenić, czy odgadnięcie jest prawidłowe.

Włączanie obsługi żądań i odpowiedzi w przepływie pracy

Żądania i odpowiedzi są obsługiwane za pośrednictwem specjalnego typu o nazwie RequestPort.

A RequestPort to kanał komunikacyjny, który umożliwia wykonawcom wysyłanie żądań i odbieranie odpowiedzi. Gdy wykonawca wysyła komunikat do RequestPortobiektu, port żądania emituje RequestInfoEvent, który zawiera szczegóły żądania. Systemy zewnętrzne mogą nasłuchiwać tych zdarzeń, przetwarzać żądania i wysyłać odpowiedzi z powrotem do przepływu pracy. Platforma automatycznie kieruje odpowiedzi z powrotem do odpowiedniego wykonawcy na podstawie oryginalnego żądania.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Dodaj port wejściowy do przepływu pracy.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

Definicja JudgeExecutor musi mieć liczbę docelową i być w stanie ocenić, czy odgadnięcie jest poprawne. Jeśli to nie jest poprawne, wyśle kolejne żądanie, aby poprosić o nowe odgadnięcie za pośrednictwem .RequestPort

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

W języku Python, wykonawcy wysyłają żądania przy użyciu ctx.request_info() i obsługują odpowiedzi przy użyciu dekoratora @response_handler.

Utwórzmy przepływ pracy, który pyta ludzkiego operatora o odgadnięcie liczby i używa wykonawcy, aby ocenić, czy odgadnięcie jest prawidłowe.

Włączanie obsługi żądań i odpowiedzi w przepływie pracy

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

Dekorator @response_handler automatycznie rejestruje metodę do obsługi odpowiedzi dla określonych typów żądań i odpowiedzi. Framework dopasowuje przychodzące odpowiedzi do poprawnego programu obsługi w oparciu o adnotacje typu parametrów original_request i response.

Obsługa żądań i odpowiedzi

Element RequestPort emituje RequestInfoEvent gdy otrzyma żądanie. Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź do funkcji wykonawczej, która wysłała oryginalne żądanie.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Tip

Zapoznaj się z pełnym przykładem kompletnego projektu możliwego do uruchomienia.

Funkcje wykonawcze mogą wysyłać żądania bezpośrednio bez konieczności użycia oddzielnego składnika. Gdy wykonawca wywołuje ctx.request_info(), przepływ pracy emituje element WorkflowEvent z parametrem type == "request_info". Możesz zapisać się na te zdarzenia w celu obsługi żądań przychodzących z procesu pracy. Gdy otrzymasz odpowiedź z systemu zewnętrznego, wyślij ją z powrotem do przepływu pracy przy użyciu mechanizmu odpowiedzi. Platforma automatycznie kieruje odpowiedź na metodę wykonawcy @response_handler .

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Tip

Zobacz ten pełny przykład , aby uzyskać pełny plik, który można uruchomić.

Punkty kontrolne i żądania

Aby dowiedzieć się więcej na temat punktów kontrolnych, zobacz Punkty kontrolne.

Po utworzeniu punktu kontrolnego oczekujące żądania są również zapisywane jako część stanu punktu kontrolnego. Po przywróceniu z punktu kontrolnego wszystkie oczekujące żądania będą wysyłane ponownie jako RequestInfoEvent obiekty, co umożliwia przechwytywanie i reagowanie na nie. Nie można podać odpowiedzi bezpośrednio podczas operacji wznawiania — zamiast tego należy nasłuchiwać ponownie emitowanych zdarzeń i reagować przy użyciu standardowego mechanizmu odpowiedzi.

Dalsze kroki