Бөлісу құралы:


Рабочие процессы Microsoft Agent Framework — "Человек в процессе" (HITL)

На этой странице представлен обзор взаимодействия "Человек в цикле" (HITL) в системе рабочих процессов Microsoft Agent Framework. HITL достигается с помощью механизма обработки запросов и ответа в рабочих процессах, что позволяет исполнителям отправлять запросы во внешние системы (например, операторы человека) и ждать их ответов, прежде чем продолжить выполнение рабочего процесса.

Обзор

Исполнители рабочего процесса могут отправлять запросы за пределы рабочего процесса и ожидать ответов. Это полезно для сценариев, когда исполнитель должен взаимодействовать с внешними системами, такими как взаимодействие с человеком в цикле или любые другие асинхронные операции.

Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.

Включение обработки запросов и ответов в рабочем процессе

Запросы и ответы обрабатываются с помощью специального типа RequestPort.

Это RequestPort канал связи, позволяющий исполнителям отправлять запросы и получать ответы. Когда исполнитель отправляет сообщение RequestPort, порт запроса излучает RequestInfoEvent, содержащее сведения о запросе. Внешние системы могут прослушивать эти события, обрабатывать запросы и отправлять ответы обратно в рабочий процесс. Фреймворк автоматически перенаправляет ответы обратно соответствующему исполнителю на основе исходного запроса.

// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Добавьте входной порт в рабочий процесс.

JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
    .AddEdge(numberRequestPort, judgeExecutor)
    .AddEdge(judgeExecutor, numberRequestPort)
    .WithOutputFrom(judgeExecutor)
    .Build();

Для определения цели JudgeExecutor требуется заданное число и возможность оценивать правильность предположения. Если это неправильно, он отправит еще один запрос, чтобы запросить новое предположение через RequestPort.

internal enum NumberSignal
{
    Init,
    Above,
    Below,
}

internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        this._targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        this._tries++;
        if (message == this._targetNumber)
        {
            await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
        }
        else if (message < this._targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
        }
    }
}

В Python исполнители отправляют запросы с помощью ctx.request_info(), а обрабатывают ответы с помощью @response_handler декоратора.

Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.

Включение обработки запросов и ответов в рабочем процессе

from dataclasses import dataclass

from agent_framework import (
    Executor,
    WorkflowBuilder,
    WorkflowContext,
    handler,
    response_handler,
)


@dataclass
class NumberSignal:
    hint: str  # "init", "above", or "below"


class JudgeExecutor(Executor):
    def __init__(self, target_number: int):
        super().__init__(id="judge")
        self._target_number = target_number
        self._tries = 0

    @handler
    async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
        self._tries += 1
        if guess == self._target_number:
            await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
        elif guess < self._target_number:
            await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
        else:
            await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)

    @response_handler
    async def on_human_response(
        self,
        original_request: NumberSignal,
        response: int,
        ctx: WorkflowContext[int, str],
    ) -> None:
        await self.handle_guess(response, ctx)


judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()

Декоратор @response_handler автоматически регистрирует метод для обработки ответов для указанных типов запросов и ответов. Фреймворк сопоставляет входящие ответы с правильным обработчиком на основе аннотаций типов параметров original_request и response.

Обработка запросов и ответов

RequestPort излучает RequestInfoEvent при получении запроса. Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ исполнителям, отправляющим исходный запрос.

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            int guess = ...; // Get the guess from the human operator or any external system
            await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
            break;

        case WorkflowOutputEvent outputEvt:
            // The workflow has yielded output
            Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
            return;
    }
}

Подсказка

См. полный образец для завершённого запускаемого проекта.

Исполнители могут отправлять запросы напрямую, не нуждаясь в отдельном компоненте. При вызове ctx.request_info() исполняющим механизмом рабочий процесс выдает WorkflowEvent с type == "request_info". Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ на метод исполнителя @response_handler .

from collections.abc import AsyncIterable

from agent_framework import WorkflowEvent


async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
    """Process events from the workflow stream to capture requests."""
    requests: list[tuple[str, NumberSignal]] = []
    async for event in stream:
        if event.type == "request_info":
            requests.append((event.request_id, event.data))

    # Handle any pending human feedback requests.
    if requests:
        responses: dict[str, int] = {}
        for request_id, request in requests:
            guess = ...  # Get the guess from the human operator or any external system.
            responses[request_id] = guess
        return responses

    return None

# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)

pending_responses = await process_event_stream(stream)
while pending_responses is not None:
    # Run the workflow until there is no more human feedback to provide,
    # in which case this workflow completes.
    stream = workflow.run(stream=True, responses=pending_responses)
    pending_responses = await process_event_stream(stream)

Подсказка

См. полный пример полного запускаемого файла.

Человек в цикле управления с координацией агентов

Описанный RequestPort выше шаблон работает с пользовательскими исполнителями и WorkflowBuilder. При использовании оркестрации агентов (таких как последовательные, одновременные или групповые рабочие процессы чата), утверждение средства достигается с помощью механизма запроса и ответа в цикле.

Агенты могут использовать инструменты, которые требуют одобрения человеком перед выполнением. Когда агент пытается вызвать инструмент, требующий утверждения, рабочий процесс приостанавливается и выдает RequestInfoEvent, как в шаблоне RequestPort, но полезная нагрузка события содержит ToolApprovalRequestContent для C# или Content вместе с type == "function_approval_request" для Python вместо пользовательского типа запроса.

Контрольные точки и запросы

Дополнительные сведения о контрольных точках см. в разделе "Контрольные точки".

При создании контрольной точки ожидающие запросы также сохраняются в составе состояния контрольной точки. При восстановлении из контрольной точки все ожидающие запросы будут повторно выдаваться в виде RequestInfoEvent объектов, что позволяет записывать и реагировать на них. Вы не можете предоставить ответы непосредственно во время возобновления. Вместо этого вы должны слушать повторно сгенерированные события и реагировать с помощью стандартного механизма ответа.

Дальнейшие шаги