Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
На этой странице представлен обзор взаимодействия "Человек в цикле" (HITL) в системе рабочих процессов Microsoft Agent Framework. HITL достигается с помощью механизма обработки запросов и ответа в рабочих процессах, что позволяет исполнителям отправлять запросы во внешние системы (например, операторы человека) и ждать их ответов, прежде чем продолжить выполнение рабочего процесса.
Обзор
Исполнители рабочего процесса могут отправлять запросы за пределы рабочего процесса и ожидать ответов. Это полезно для сценариев, когда исполнитель должен взаимодействовать с внешними системами, такими как взаимодействие с человеком в цикле или любые другие асинхронные операции.
Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.
Включение обработки запросов и ответов в рабочем процессе
Запросы и ответы обрабатываются с помощью специального типа RequestPort.
Это RequestPort канал связи, позволяющий исполнителям отправлять запросы и получать ответы. Когда исполнитель отправляет сообщение RequestPort, порт запроса излучает RequestInfoEvent, содержащее сведения о запросе. Внешние системы могут прослушивать эти события, обрабатывать запросы и отправлять ответы обратно в рабочий процесс. Фреймворк автоматически перенаправляет ответы обратно соответствующему исполнителю на основе исходного запроса.
// Create a request port that receives requests of type NumberSignal and responses of type int.
var numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Добавьте входной порт в рабочий процесс.
JudgeExecutor judgeExecutor = new(42);
var workflow = new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.Build();
Для определения цели JudgeExecutor требуется заданное число и возможность оценивать правильность предположения. Если это неправильно, он отправит еще один запрос, чтобы запросить новое предположение через RequestPort.
internal enum NumberSignal
{
Init,
Above,
Below,
}
internal sealed class JudgeExecutor() : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
this._targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken = default)
{
this._tries++;
if (message == this._targetNumber)
{
await context.YieldOutputAsync($"{this._targetNumber} found in {this._tries} tries!", cancellationToken);
}
else if (message < this._targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken: cancellationToken);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken: cancellationToken);
}
}
}
В Python исполнители отправляют запросы с помощью ctx.request_info(), а обрабатывают ответы с помощью @response_handler декоратора.
Давайте создадим рабочий процесс, который просит человеческого оператора угадать число и использует исполнителя, чтобы судить, правильно ли угадывание.
Включение обработки запросов и ответов в рабочем процессе
from dataclasses import dataclass
from agent_framework import (
Executor,
WorkflowBuilder,
WorkflowContext,
handler,
response_handler,
)
@dataclass
class NumberSignal:
hint: str # "init", "above", or "below"
class JudgeExecutor(Executor):
def __init__(self, target_number: int):
super().__init__(id="judge")
self._target_number = target_number
self._tries = 0
@handler
async def handle_guess(self, guess: int, ctx: WorkflowContext[int, str]) -> None:
self._tries += 1
if guess == self._target_number:
await ctx.yield_output(f"{self._target_number} found in {self._tries} tries!")
elif guess < self._target_number:
await ctx.request_info(request_data=NumberSignal(hint="below"), response_type=int)
else:
await ctx.request_info(request_data=NumberSignal(hint="above"), response_type=int)
@response_handler
async def on_human_response(
self,
original_request: NumberSignal,
response: int,
ctx: WorkflowContext[int, str],
) -> None:
await self.handle_guess(response, ctx)
judge = JudgeExecutor(target_number=42)
workflow = WorkflowBuilder(start_executor=judge).build()
Декоратор @response_handler автоматически регистрирует метод для обработки ответов для указанных типов запросов и ответов. Фреймворк сопоставляет входящие ответы с правильным обработчиком на основе аннотаций типов параметров original_request и response.
Обработка запросов и ответов
RequestPort излучает RequestInfoEvent при получении запроса. Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ исполнителям, отправляющим исходный запрос.
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(workflow, NumberSignal.Init);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle `RequestInfoEvent` from the workflow
int guess = ...; // Get the guess from the human operator or any external system
await handle.SendResponseAsync(requestInputEvt.Request.CreateResponse(guess));
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
Подсказка
См. полный образец для завершённого запускаемого проекта.
Исполнители могут отправлять запросы напрямую, не нуждаясь в отдельном компоненте. При вызове ctx.request_info() исполняющим механизмом рабочий процесс выдает WorkflowEvent с type == "request_info". Вы можете подписаться на эти события для обработки входящих запросов из рабочего процесса. При получении ответа от внешней системы отправьте его обратно в рабочий процесс с помощью механизма ответа. Платформа автоматически направляет ответ на метод исполнителя @response_handler .
from collections.abc import AsyncIterable
from agent_framework import WorkflowEvent
async def process_event_stream(stream: AsyncIterable[WorkflowEvent]) -> dict[str, int] | None:
"""Process events from the workflow stream to capture requests."""
requests: list[tuple[str, NumberSignal]] = []
async for event in stream:
if event.type == "request_info":
requests.append((event.request_id, event.data))
# Handle any pending human feedback requests.
if requests:
responses: dict[str, int] = {}
for request_id, request in requests:
guess = ... # Get the guess from the human operator or any external system.
responses[request_id] = guess
return responses
return None
# Initiate the first run of the workflow with an initial guess.
# Runs are not isolated; state is preserved across multiple calls to run.
stream = workflow.run(25, stream=True)
pending_responses = await process_event_stream(stream)
while pending_responses is not None:
# Run the workflow until there is no more human feedback to provide,
# in which case this workflow completes.
stream = workflow.run(stream=True, responses=pending_responses)
pending_responses = await process_event_stream(stream)
Подсказка
См. полный пример полного запускаемого файла.
Человек в цикле управления с координацией агентов
Описанный RequestPort выше шаблон работает с пользовательскими исполнителями и WorkflowBuilder. При использовании оркестрации агентов (таких как последовательные, одновременные или групповые рабочие процессы чата), утверждение средства достигается с помощью механизма запроса и ответа в цикле.
Агенты могут использовать инструменты, которые требуют одобрения человеком перед выполнением. Когда агент пытается вызвать инструмент, требующий утверждения, рабочий процесс приостанавливается и выдает RequestInfoEvent, как в шаблоне RequestPort, но полезная нагрузка события содержит ToolApprovalRequestContent для C# или Content вместе с type == "function_approval_request" для Python вместо пользовательского типа запроса.
Подсказка
Полные примеры кода см. в следующих примерах:
Контрольные точки и запросы
Дополнительные сведения о контрольных точках см. в разделе "Контрольные точки".
При создании контрольной точки ожидающие запросы также сохраняются в составе состояния контрольной точки. При восстановлении из контрольной точки все ожидающие запросы будут повторно выдаваться в виде RequestInfoEvent объектов, что позволяет записывать и реагировать на них. Вы не можете предоставить ответы непосредственно во время возобновления. Вместо этого вы должны слушать повторно сгенерированные события и реагировать с помощью стандартного механизма ответа.