Поделиться через


Обработка запросов и ответов в рабочих процессах

В этом руководстве показано, как обрабатывать запросы и ответы в рабочих процессах с помощью рабочих процессов Agent Framework. Вы узнаете, как создать интерактивные рабочие процессы, которые могут приостановить выполнение для запроса входных данных из внешних источников (например, людей или других систем), а затем возобновить работу после предоставления ответа.

Основные понятия, описанные в статье

В .NET рабочие процессы с участием человека используют RequestPort и внешнее управление запросами для приостановки выполнения операций и сбора пользовательского ввода. Этот шаблон поддерживает интерактивные рабочие процессы, в которых система может запрашивать информацию из внешних источников во время выполнения.

Предпосылки

Установка пакетов Nuget

Сначала установите необходимые пакеты для проекта .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Ключевые компоненты

RequestPort и внешние запросы

Элемент RequestPort выступает в качестве моста между рабочим процессом и внешними источниками входных данных. Когда рабочий процесс требует ввода данных, он генерирует RequestInfoEvent, который обрабатывает ваше приложение.

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Типы сигналов

Определите типы сигналов для взаимодействия с различными типами запросов:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

Исполнитель рабочего процесса

Создайте исполнителя, обрабатывающие входные данные пользователя и предоставляющие отзыв:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

Создание рабочего процесса

Подключите «requestPort» и исполнителя в обратную цепь:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Выполнение интерактивного рабочего процесса

Обработка внешних запросов во время выполнения рабочего процесса:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

Обработка запросов

Обработка различных типов входных запросов:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Основные понятия реализации

Поток данных RequestInfoEvent

  1. Выполнение рабочего процесса: процесс выполняется до тех пор, пока не понадобятся внешние входные данные
  2. Создание запросов: RequestPort создает RequestInfoEvent запрос с подробными сведениями о запросе
  3. Внешняя обработка: приложение перехватывает событие и собирает входные данные пользователя
  4. Отправка ответа: верните ExternalResponse для продолжения рабочего процесса
  5. Возобновление рабочего процесса: рабочий процесс продолжает обработку с предоставленными входными данными

Жизненный цикл рабочего процесса

  • Выполнение потоковой передачи. Использование StreamAsync для отслеживания событий в режиме реального времени
  • Обработка событий: процесс RequestInfoEvent ввода запросов и WorkflowOutputEvent завершения
  • Координация ответов: сопоставление ответов на запросы с помощью механизма обработки ответов рабочего процесса

Поток реализации

  1. Инициализация рабочего процесса: рабочий процесс начинается с отправки NumberSignal.Init в RequestPort.

  2. Создание запросов: RequestPort создает RequestInfoEvent запрос первоначального предположения от пользователя.

  3. Приостановка рабочего процесса: рабочий процесс приостанавливает и ожидает внешних входных данных, пока приложение обрабатывает запрос.

  4. Человеческая реакция: внешнее приложение собирает входные данные пользователя и отправляет ExternalResponse обратно в рабочий процесс.

  5. Обработка и обратная связь: JudgeExecutor обрабатывает предположение и либо завершает рабочий процесс, либо отправляет новый сигнал (выше или ниже), чтобы запросить другое предположение.

  6. Продолжение цикла: процесс повторяется, пока не будет угадывается правильное число.

Преимущества платформы

  • Безопасность типов: строгая типизация гарантирует, что контракты на запросы и ответы сохраняются
  • На основе событий: система событий с расширенными возможностями обеспечивает видимость выполнения рабочего процесса.
  • Приостановка выполнения: рабочие процессы могут приостановиться на неопределенный срок, ожидая внешних входных данных
  • Управление состоянием: состояние рабочего процесса сохраняется в циклах приостановки и возобновления
  • Гибкая интеграция: RequestPorts может интегрироваться с любым внешним источником входных данных (пользовательский интерфейс, API, консоль и т. д.)

Полный пример

Полный рабочий пример см. в примере "Человек в цикле. Основной пример".

Этот шаблон позволяет создавать сложные интерактивные приложения, где пользователи могут предоставлять входные данные в ключевых точках принятия решений в автоматизированных рабочих процессах.

Что вы будете создавать

Вы создадите интерактивный рабочий процесс определения числа, демонстрирующий шаблоны ответа на запросы:

  • Агент искусственного интеллекта, который делает интеллектуальные догадки
  • Исполнители, которые могут напрямую отправлять запросы с помощью request_info API
  • Менеджер по очередям, который координирует взаимодействие между агентом и человеком с помощью @response_handler
  • Интерактивные входные и выходные данные консоли для обратной связи в режиме реального времени

Предпосылки

  • Python 3.10 или более поздней версии
  • Настроено развертывание Azure OpenAI
  • Настроена проверка подлинности Azure CLI (az login)
  • Базовое понимание асинхронного программирования Python

Основные понятия

Возможности запросов и ответов

У исполнителей есть встроенные возможности запросов и ответов, которые обеспечивают человеческое участие в процессе.

  • Вызов ctx.request_info(request_data=request_data, response_type=response_type) для отправки запросов
  • @response_handler Используйте декоратор для обработки ответов
  • Определение пользовательских типов запросов и ответов без требований к наследованию

поток запрос-ответ

Исполнители могут отправлять запросы непосредственно с помощью ctx.request_info() и обрабатывать ответы с использованием декоратора @response_handler.

  1. Вызовы к исполнителю ctx.request_info(request_data=request_data, response_type=response_type)
  2. Процесс RequestInfoEvent рабочего процесса выдает данные запроса
  3. Внешняя система (человеческая, API и т. д.) обрабатывает запрос
  4. Ответ отправляется обратно через send_responses_streaming()
  5. Рабочий процесс возобновляет работу и предоставляет ответ на метод исполнителя @response_handler

Настройка среды

Сначала установите необходимые пакеты:

pip install agent-framework-core --pre
pip install azure-identity

Определение моделей запросов и ответов

Начните с определения структур данных для обмена данными с запросами:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

Это HumanFeedbackRequest простой класс данных для структурированных нагрузок запросов.

  • Жёсткая типизация нагрузок запроса
  • Проверка на совместимость с будущими версиями
  • Очистка семантики корреляции с помощью ответов
  • Контекстные поля (например, предыдущее предположение) для запросов с расширенным пользовательским интерфейсом

Создание диспетчера поворотов

Менеджер очередности координирует поток между ИИ-агентом и человеком.

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

Создание рабочего процесса

Создайте основной рабочий процесс, который подключает все компоненты:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.as_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Запуск примера

Полную рабочую реализацию см. в примере "Игра в угадывание" "Человек в контуре".

Принцип работы

  1. Инициализация рабочего процесса: рабочий процесс начинается с TurnManager запроса первоначального предположения от агента ИИ.

  2. Ответ агента: агент ИИ делает предположение и возвращает структурированный JSON, который возвращается обратно в TurnManager.

  3. Запрос человека: TurnManager обрабатывает угадывание агента и вызывает ctx.request_info() с HumanFeedbackRequest.

  4. Приостановка рабочего процесса: рабочий процесс выдает RequestInfoEvent, продолжается до тех пор, пока не останется дальнейших возможных действий, а затем ожидает ввода от человека.

  5. Реагирование человека: внешнее приложение собирает данные человека и отправляет ответы обратно с помощью send_responses_streaming().

  6. Возобновление и продолжение: рабочий процесс возобновляется, TurnManager@response_handler метод обрабатывает отзывы человека и либо заканчивает игру, либо отправляет другой запрос агенту.

Ключевые преимущества

  • Структурированный обмен данными: типобезопасные модели запросов и ответов предотвращают ошибки во время выполнения
  • Корреляция: идентификаторы запросов обеспечивают соответствие ответов правильным запросам
  • Приостановка выполнения: рабочие процессы могут приостановиться на неопределенный срок, ожидая внешних входных данных
  • Сохранение состояния: состояние рабочего процесса сохраняется в циклах приостановки и возобновления
  • На основе событий: расширенная система событий обеспечивает видимость состояния рабочего процесса и переходов

Этот шаблон позволяет создавать сложные интерактивные приложения, в которых агенты ИИ и люди легко взаимодействуют в структурированных рабочих процессах.

Дальнейшие шаги