Примечание.
Для доступа к этой странице требуется авторизация. Вы можете попробовать войти или изменить каталоги.
Для доступа к этой странице требуется авторизация. Вы можете попробовать изменить каталоги.
В этом руководстве показано, как обрабатывать запросы и ответы в рабочих процессах с помощью рабочих процессов Agent Framework. Вы узнаете, как создать интерактивные рабочие процессы, которые могут приостановить выполнение для запроса входных данных из внешних источников (например, людей или других систем), а затем возобновить работу после предоставления ответа.
Основные понятия, описанные в статье
В .NET рабочие процессы с участием человека используют RequestPort и внешнее управление запросами для приостановки выполнения операций и сбора пользовательского ввода. Этот шаблон поддерживает интерактивные рабочие процессы, в которых система может запрашивать информацию из внешних источников во время выполнения.
Предпосылки
- Пакет SDK для .NET 8.0 или более поздней версии.
- Конечная точка и развертывание службы Azure OpenAI настроены.
- Azure CLI установлен и прошел проверку подлинности (для проверки подлинности учетных данных Azure).
- Базовое понимание программирования C# и асинхронного программирования.
- Новое консольное приложение.
Установка пакетов Nuget
Сначала установите необходимые пакеты для проекта .NET:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Ключевые компоненты
RequestPort и внешние запросы
Элемент RequestPort выступает в качестве моста между рабочим процессом и внешними источниками входных данных. Когда рабочий процесс требует ввода данных, он генерирует RequestInfoEvent, который обрабатывает ваше приложение.
// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
Типы сигналов
Определите типы сигналов для взаимодействия с различными типами запросов:
/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
Init, // Initial guess request
Above, // Previous guess was too high
Below, // Previous guess was too low
}
Исполнитель рабочего процесса
Создайте исполнителя, обрабатывающие входные данные пользователя и предоставляющие отзыв:
/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
private readonly int _targetNumber;
private int _tries;
public JudgeExecutor(int targetNumber) : this()
{
_targetNumber = targetNumber;
}
public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
{
_tries++;
if (message == _targetNumber)
{
await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
.ConfigureAwait(false);
}
else if (message < _targetNumber)
{
await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
}
else
{
await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
}
}
}
Создание рабочего процесса
Подключите «requestPort» и исполнителя в обратную цепь:
internal static class WorkflowHelper
{
internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
{
// Create the executors
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
JudgeExecutor judgeExecutor = new(42);
// Build the workflow by connecting executors in a loop
return new WorkflowBuilder(numberRequestPort)
.AddEdge(numberRequestPort, judgeExecutor)
.AddEdge(judgeExecutor, numberRequestPort)
.WithOutputFrom(judgeExecutor)
.BuildAsync<NumberSignal>();
}
}
Выполнение интерактивного рабочего процесса
Обработка внешних запросов во время выполнения рабочего процесса:
private static async Task Main()
{
// Create the workflow
var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);
// Execute the workflow
await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
switch (evt)
{
case RequestInfoEvent requestInputEvt:
// Handle human input request from the workflow
ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
await handle.SendResponseAsync(response).ConfigureAwait(false);
break;
case WorkflowOutputEvent outputEvt:
// The workflow has yielded output
Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
return;
}
}
}
Обработка запросов
Обработка различных типов входных запросов:
private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
switch (request.DataAs<NumberSignal?>())
{
case NumberSignal.Init:
int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
return request.CreateResponse(initialGuess);
case NumberSignal.Above:
int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
return request.CreateResponse(lowerGuess);
case NumberSignal.Below:
int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
return request.CreateResponse(higherGuess);
default:
throw new ArgumentException("Unexpected request type.");
}
}
private static int ReadIntegerFromConsole(string prompt)
{
while (true)
{
Console.Write(prompt);
string? input = Console.ReadLine();
if (int.TryParse(input, out int value))
{
return value;
}
Console.WriteLine("Invalid input. Please enter a valid integer.");
}
}
Основные понятия реализации
Поток данных RequestInfoEvent
- Выполнение рабочего процесса: процесс выполняется до тех пор, пока не понадобятся внешние входные данные
-
Создание запросов: RequestPort создает
RequestInfoEventзапрос с подробными сведениями о запросе - Внешняя обработка: приложение перехватывает событие и собирает входные данные пользователя
-
Отправка ответа: верните
ExternalResponseдля продолжения рабочего процесса - Возобновление рабочего процесса: рабочий процесс продолжает обработку с предоставленными входными данными
Жизненный цикл рабочего процесса
-
Выполнение потоковой передачи. Использование
StreamAsyncдля отслеживания событий в режиме реального времени -
Обработка событий: процесс
RequestInfoEventввода запросов иWorkflowOutputEventзавершения - Координация ответов: сопоставление ответов на запросы с помощью механизма обработки ответов рабочего процесса
Поток реализации
Инициализация рабочего процесса: рабочий процесс начинается с отправки
NumberSignal.Initв RequestPort.Создание запросов: RequestPort создает
RequestInfoEventзапрос первоначального предположения от пользователя.Приостановка рабочего процесса: рабочий процесс приостанавливает и ожидает внешних входных данных, пока приложение обрабатывает запрос.
Человеческая реакция: внешнее приложение собирает входные данные пользователя и отправляет
ExternalResponseобратно в рабочий процесс.Обработка и обратная связь:
JudgeExecutorобрабатывает предположение и либо завершает рабочий процесс, либо отправляет новый сигнал (выше или ниже), чтобы запросить другое предположение.Продолжение цикла: процесс повторяется, пока не будет угадывается правильное число.
Преимущества платформы
- Безопасность типов: строгая типизация гарантирует, что контракты на запросы и ответы сохраняются
- На основе событий: система событий с расширенными возможностями обеспечивает видимость выполнения рабочего процесса.
- Приостановка выполнения: рабочие процессы могут приостановиться на неопределенный срок, ожидая внешних входных данных
- Управление состоянием: состояние рабочего процесса сохраняется в циклах приостановки и возобновления
- Гибкая интеграция: RequestPorts может интегрироваться с любым внешним источником входных данных (пользовательский интерфейс, API, консоль и т. д.)
Полный пример
Полный рабочий пример см. в примере "Человек в цикле. Основной пример".
Этот шаблон позволяет создавать сложные интерактивные приложения, где пользователи могут предоставлять входные данные в ключевых точках принятия решений в автоматизированных рабочих процессах.
Что вы будете создавать
Вы создадите интерактивный рабочий процесс определения числа, демонстрирующий шаблоны ответа на запросы:
- Агент искусственного интеллекта, который делает интеллектуальные догадки
- Исполнители, которые могут напрямую отправлять запросы с помощью
request_infoAPI - Менеджер по очередям, который координирует взаимодействие между агентом и человеком с помощью
@response_handler - Интерактивные входные и выходные данные консоли для обратной связи в режиме реального времени
Предпосылки
- Python 3.10 или более поздней версии
- Настроено развертывание Azure OpenAI
- Настроена проверка подлинности Azure CLI (
az login) - Базовое понимание асинхронного программирования Python
Основные понятия
Возможности запросов и ответов
У исполнителей есть встроенные возможности запросов и ответов, которые обеспечивают человеческое участие в процессе.
- Вызов
ctx.request_info(request_data=request_data, response_type=response_type)для отправки запросов -
@response_handlerИспользуйте декоратор для обработки ответов - Определение пользовательских типов запросов и ответов без требований к наследованию
поток запрос-ответ
Исполнители могут отправлять запросы непосредственно с помощью ctx.request_info() и обрабатывать ответы с использованием декоратора @response_handler.
- Вызовы к исполнителю
ctx.request_info(request_data=request_data, response_type=response_type) - Процесс
RequestInfoEventрабочего процесса выдает данные запроса - Внешняя система (человеческая, API и т. д.) обрабатывает запрос
- Ответ отправляется обратно через
send_responses_streaming() - Рабочий процесс возобновляет работу и предоставляет ответ на метод исполнителя
@response_handler
Настройка среды
Сначала установите необходимые пакеты:
pip install agent-framework-core --pre
pip install azure-identity
Определение моделей запросов и ответов
Начните с определения структур данных для обмена данными с запросами:
import asyncio
from dataclasses import dataclass
from pydantic import BaseModel
from agent_framework import (
AgentExecutor,
AgentExecutorRequest,
AgentExecutorResponse,
ChatMessage,
Executor,
RequestInfoEvent,
Role,
WorkflowBuilder,
WorkflowContext,
WorkflowOutputEvent,
WorkflowRunState,
WorkflowStatusEvent,
handler,
response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential
@dataclass
class HumanFeedbackRequest:
"""Request message for human feedback in the guessing game."""
prompt: str = ""
guess: int | None = None
class GuessOutput(BaseModel):
"""Structured output from the AI agent with response_format enforcement."""
guess: int
Это HumanFeedbackRequest простой класс данных для структурированных нагрузок запросов.
- Жёсткая типизация нагрузок запроса
- Проверка на совместимость с будущими версиями
- Очистка семантики корреляции с помощью ответов
- Контекстные поля (например, предыдущее предположение) для запросов с расширенным пользовательским интерфейсом
Создание диспетчера поворотов
Менеджер очередности координирует поток между ИИ-агентом и человеком.
class TurnManager(Executor):
"""Coordinates turns between the AI agent and human player.
Responsibilities:
- Start the game by requesting the agent's first guess
- Process agent responses and request human feedback
- Handle human feedback and continue the game or finish
"""
def __init__(self, id: str | None = None):
super().__init__(id=id or "turn_manager")
@handler
async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
"""Start the game by asking the agent for an initial guess."""
user = ChatMessage(Role.USER, text="Start by making your first guess.")
await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))
@handler
async def on_agent_response(
self,
result: AgentExecutorResponse,
ctx: WorkflowContext,
) -> None:
"""Handle the agent's guess and request human guidance."""
# Parse structured model output (defensive default if agent didn't reply)
text = result.agent_run_response.text or ""
last_guess = GuessOutput.model_validate_json(text).guess if text else None
# Craft a clear human prompt that defines higher/lower relative to agent's guess
prompt = (
f"The agent guessed: {last_guess if last_guess is not None else text}. "
"Type one of: higher (your number is higher than this guess), "
"lower (your number is lower than this guess), correct, or exit."
)
# Send a request using the request_info API
await ctx.request_info(
request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
response_type=str
)
@response_handler
async def on_human_feedback(
self,
original_request: HumanFeedbackRequest,
feedback: str,
ctx: WorkflowContext[AgentExecutorRequest, str],
) -> None:
"""Continue the game or finish based on human feedback."""
reply = feedback.strip().lower()
# Use the correlated request's guess to avoid extra state reads
last_guess = original_request.guess
if reply == "correct":
await ctx.yield_output(f"Guessed correctly: {last_guess}")
return
# Provide feedback to the agent for the next guess
user_msg = ChatMessage(
Role.USER,
text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
)
await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))
Создание рабочего процесса
Создайте основной рабочий процесс, который подключает все компоненты:
async def main() -> None:
# Create the chat agent with structured output enforcement
chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
agent = chat_client.as_agent(
instructions=(
"You guess a number between 1 and 10. "
"If the user says 'higher' or 'lower', adjust your next guess. "
'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
"No explanations or additional text."
),
response_format=GuessOutput,
)
# Create workflow components
turn_manager = TurnManager(id="turn_manager")
agent_exec = AgentExecutor(agent=agent, id="agent")
# Build the workflow graph
workflow = (
WorkflowBuilder()
.set_start_executor(turn_manager)
.add_edge(turn_manager, agent_exec) # Ask agent to make/adjust a guess
.add_edge(agent_exec, turn_manager) # Agent's response goes back to coordinator
.build()
)
# Execute the interactive workflow
await run_interactive_workflow(workflow)
async def run_interactive_workflow(workflow):
"""Run the workflow with human-in-the-loop interaction."""
pending_responses: dict[str, str] | None = None
completed = False
workflow_output: str | None = None
print("🎯 Number Guessing Game")
print("Think of a number between 1 and 10, and I'll try to guess it!")
print("-" * 50)
while not completed:
# First iteration uses run_stream("start")
# Subsequent iterations use send_responses_streaming with pending responses
stream = (
workflow.send_responses_streaming(pending_responses)
if pending_responses
else workflow.run_stream("start")
)
# Collect events for this turn
events = [event async for event in stream]
pending_responses = None
# Process events to collect requests and detect completion
requests: list[tuple[str, str]] = [] # (request_id, prompt)
for event in events:
if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
# RequestInfoEvent for our HumanFeedbackRequest
requests.append((event.request_id, event.data.prompt))
elif isinstance(event, WorkflowOutputEvent):
# Capture workflow output when yielded
workflow_output = str(event.data)
completed = True
# Check workflow status
pending_status = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
for e in events
)
idle_with_requests = any(
isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
for e in events
)
if pending_status:
print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
if idle_with_requests:
print("⏸️ State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")
# Handle human requests if any
if requests and not completed:
responses: dict[str, str] = {}
for req_id, prompt in requests:
print(f"\n🤖 {prompt}")
answer = input("👤 Enter higher/lower/correct/exit: ").lower()
if answer == "exit":
print("👋 Exiting...")
return
responses[req_id] = answer
pending_responses = responses
# Show final result
print(f"\n🎉 {workflow_output}")
Запуск примера
Полную рабочую реализацию см. в примере "Игра в угадывание" "Человек в контуре".
Принцип работы
Инициализация рабочего процесса: рабочий процесс начинается с
TurnManagerзапроса первоначального предположения от агента ИИ.Ответ агента: агент ИИ делает предположение и возвращает структурированный JSON, который возвращается обратно в
TurnManager.Запрос человека:
TurnManagerобрабатывает угадывание агента и вызываетctx.request_info()сHumanFeedbackRequest.Приостановка рабочего процесса: рабочий процесс выдает
RequestInfoEvent, продолжается до тех пор, пока не останется дальнейших возможных действий, а затем ожидает ввода от человека.Реагирование человека: внешнее приложение собирает данные человека и отправляет ответы обратно с помощью
send_responses_streaming().Возобновление и продолжение: рабочий процесс возобновляется,
TurnManager@response_handlerметод обрабатывает отзывы человека и либо заканчивает игру, либо отправляет другой запрос агенту.
Ключевые преимущества
- Структурированный обмен данными: типобезопасные модели запросов и ответов предотвращают ошибки во время выполнения
- Корреляция: идентификаторы запросов обеспечивают соответствие ответов правильным запросам
- Приостановка выполнения: рабочие процессы могут приостановиться на неопределенный срок, ожидая внешних входных данных
- Сохранение состояния: состояние рабочего процесса сохраняется в циклах приостановки и возобновления
- На основе событий: расширенная система событий обеспечивает видимость состояния рабочего процесса и переходов
Этот шаблон позволяет создавать сложные интерактивные приложения, в которых агенты ИИ и люди легко взаимодействуют в структурированных рабочих процессах.