Aracılığıyla paylaş


İş Akışlarında İstekleri ve Yanıtları İşleme

Bu öğreticide, Agent Framework İş Akışları kullanılarak iş akışlarındaki isteklerin ve yanıtların nasıl işleneceğini gösterilmektedir. Dış kaynaklardan (insanlar veya diğer sistemler gibi) giriş istemek için yürütmeyi duraklatabilen etkileşimli iş akışları oluşturmayı ve ardından bir yanıt sağlandıktan sonra devam etmeyi öğreneceksiniz.

Ele Alınan Kavramlar

.NET'te insan-döngüsünde iş akışları, yürütmeyi duraklatmak ve kullanıcı girişi toplamak için RequestPort ve dış istek işlemeyi kullanır. Bu düzen, yürütme sırasında sistemin dış kaynaklardan bilgi isteyebildiği etkileşimli iş akışları sağlar.

Önkoşullar

NuGet paketlerini yükleme

İlk olarak, .NET projeniz için gerekli paketleri yükleyin:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Temel Bileşenler

RequestPort ve Dış İstekler

A RequestPort , iş akışı ile dış giriş kaynakları arasında köprü görevi görür. İş akışı girişe ihtiyaç duyduğunda, uygulamanızın işlediği bir RequestInfoEvent oluşturur:

// Create a RequestPort for handling human input requests
RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");

Sinyal Türleri

Farklı istek türlerini iletmek için sinyal türlerini tanımlayın:

/// <summary>
/// Signals used for communication between guesses and the JudgeExecutor.
/// </summary>
internal enum NumberSignal
{
    Init,     // Initial guess request
    Above,    // Previous guess was too high
    Below,    // Previous guess was too low
}

İş Akışı Yürütücüsü

Kullanıcı girişini işleyen ve geri bildirim sağlayan yürütücüler oluşturun:

/// <summary>
/// Executor that judges the guess and provides feedback.
/// </summary>
internal sealed class JudgeExecutor : Executor<int>("Judge")
{
    private readonly int _targetNumber;
    private int _tries;

    public JudgeExecutor(int targetNumber) : this()
    {
        _targetNumber = targetNumber;
    }

    public override async ValueTask HandleAsync(int message, IWorkflowContext context, CancellationToken cancellationToken)
    {
        _tries++;
        if (message == _targetNumber)
        {
            await context.YieldOutputAsync($"{_targetNumber} found in {_tries} tries!", cancellationToken)
                         .ConfigureAwait(false);
        }
        else if (message < _targetNumber)
        {
            await context.SendMessageAsync(NumberSignal.Below, cancellationToken).ConfigureAwait(false);
        }
        else
        {
            await context.SendMessageAsync(NumberSignal.Above, cancellationToken).ConfigureAwait(false);
        }
    }
}

İş Akışı Oluşturma

RequestPort ve yürütücüye bir geri bildirim döngüsünde bağlanın:

internal static class WorkflowHelper
{
    internal static ValueTask<Workflow<NumberSignal>> GetWorkflowAsync()
    {
        // Create the executors
        RequestPort numberRequestPort = RequestPort.Create<NumberSignal, int>("GuessNumber");
        JudgeExecutor judgeExecutor = new(42);

        // Build the workflow by connecting executors in a loop
        return new WorkflowBuilder(numberRequestPort)
            .AddEdge(numberRequestPort, judgeExecutor)
            .AddEdge(judgeExecutor, numberRequestPort)
            .WithOutputFrom(judgeExecutor)
            .BuildAsync<NumberSignal>();
    }
}

Etkileşimli İş Akışını Yürütme

İş akışı yürütme sırasında dış istekleri işleme:

private static async Task Main()
{
    // Create the workflow
    var workflow = await WorkflowHelper.GetWorkflowAsync().ConfigureAwait(false);

    // Execute the workflow
    await using StreamingRun handle = await InProcessExecution.StreamAsync(workflow, NumberSignal.Init).ConfigureAwait(false);
    await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
    {
        switch (evt)
        {
            case RequestInfoEvent requestInputEvt:
                // Handle human input request from the workflow
                ExternalResponse response = HandleExternalRequest(requestInputEvt.Request);
                await handle.SendResponseAsync(response).ConfigureAwait(false);
                break;

            case WorkflowOutputEvent outputEvt:
                // The workflow has yielded output
                Console.WriteLine($"Workflow completed with result: {outputEvt.Data}");
                return;
        }
    }
}

İstek İşleme

Farklı giriş isteği türlerini işleme:

private static ExternalResponse HandleExternalRequest(ExternalRequest request)
{
    switch (request.DataAs<NumberSignal?>())
    {
        case NumberSignal.Init:
            int initialGuess = ReadIntegerFromConsole("Please provide your initial guess: ");
            return request.CreateResponse(initialGuess);
        case NumberSignal.Above:
            int lowerGuess = ReadIntegerFromConsole("You previously guessed too large. Please provide a new guess: ");
            return request.CreateResponse(lowerGuess);
        case NumberSignal.Below:
            int higherGuess = ReadIntegerFromConsole("You previously guessed too small. Please provide a new guess: ");
            return request.CreateResponse(higherGuess);
        default:
            throw new ArgumentException("Unexpected request type.");
    }
}

private static int ReadIntegerFromConsole(string prompt)
{
    while (true)
    {
        Console.Write(prompt);
        string? input = Console.ReadLine();
        if (int.TryParse(input, out int value))
        {
            return value;
        }
        Console.WriteLine("Invalid input. Please enter a valid integer.");
    }
}

Uygulama Kavramları

RequestInfoEvent Akışı

  1. İş Akışı Yürütme: İş akışı, dış girişe ihtiyaç duyana kadar işler
  2. İstek Oluşturma: RequestPort, istek ayrıntılarıyla birlikte bir RequestInfoEvent oluşturur
  3. Dış İşleme: Uygulamanız olayı yakalar ve kullanıcı girişi toplar
  4. Yanıt Gönderme: İş akışına devam etmek için geri gönder ExternalResponse
  5. İş Akışı Yeniden Başlatma: İş akışı sağlanan girişle işlemeye devam eder

İş Akışı Yaşam Döngüsü

  • Akış Yürütme: Olayları gerçek zamanlı olarak izlemek için kullanın StreamAsync
  • Olay İşleme: RequestInfoEvent Giriş istekleri ve WorkflowOutputEvent tamamlama işlemi
  • Yanıt Koordinasyonu: İş akışının yanıt işleme mekanizmasını kullanarak yanıtları isteklerle eşleştirme

Uygulama Akışı

  1. İş Akışı Başlatma: İş akışı, RequestPort'a bir NumberSignal.Init göndererek başlar.

  2. İstek Oluşturma: RequestPort, kullanıcıdan ilk tahmini isteyen bir RequestInfoEvent istek oluşturur.

  3. İş Akışı Duraklatma: uygulama isteği işlerken iş akışı duraklatılır ve dış girişi bekler.

  4. İnsan Yanıtı: Dış uygulama kullanıcı girişini toplar ve iş akışına geri gönderir ExternalResponse .

  5. İşleme ve Geri Bildirim: Tahmin JudgeExecutor işlenir ve iş akışını tamamlar veya başka bir tahmin istemek için yeni bir sinyal (Yukarıda/Aşağıda) gönderir.

  6. Döngü Devamı: doğru sayı tahmin edilene kadar işlem yineleniyor.

Çerçeve Avantajları

  • Tür Güvenliği: Güçlü yazma, istek yanıtı sözleşmelerinin korunmasını sağlar
  • Event-Driven: Zengin olay sistemi iş akışı yürütmesi için görünürlük sağlar
  • Duraklatılabilir Yürütme: İş akışları, dış giriş beklerken süresiz olarak duraklatılabilir
  • Durum Yönetimi: İş akışı durumu duraklatma-sürdürme döngüleri arasında korunur
  • Esnek Tümleştirme: RequestPort'lar herhangi bir dış giriş kaynağıyla (UI, API, konsol vb.) tümleştirebilir

Tam Örnek

Tam kapsamlı çalışma uygulaması için bkz. Döngüde İnsan-İçe Temel örneği.

Bu düzen, kullanıcıların otomatik iş akışları içindeki önemli karar noktalarında giriş sağlayabildiği gelişmiş etkileşimli uygulamalar oluşturmanızı sağlar.

Neler Oluşturacaksınız

İstek yanıtı desenlerini gösteren etkileşimli bir sayı tahmin oyunu iş akışı oluşturacaksınız:

  • Akıllı tahminler yapan bir yapay zeka aracısı
  • API'yi kullanarak doğrudan istek gönderebilen yürütücüler
  • Aracı ile insan etkileşimlerini koordine eden bir dönüş yöneticisi @response_handler
  • Gerçek zamanlı geri bildirim için etkileşimli konsol girişi/çıkışı

Önkoşullar

  • Python 3.10 veya üzeri
  • Azure OpenAI dağıtımı yapılandırıldı
  • Azure CLI kimlik doğrulaması yapılandırıldı (az login)
  • Python zaman uyumsuz programlama hakkında temel bilgiler

Önemli Kavramlar

İstekler ve Yanıtlar Yetkinlikleri

Yürütücüler, döngüde insan etkileşimlerini etkinleştiren yerleşik istek ve yanıt özelliklerine sahiptir:

  • İstek göndermek için ctx.request_info(request_data=request_data, response_type=response_type) çağrı yap
  • Yanıtları işlemek için @response_handler dekoratörünü kullanın
  • Devralma gereksinimleri olmadan özel istek/yanıt türlerini tanımlama

Request-Response Akışı

Yürütücüler ctx.request_info() doğrudan istek gönderebilir ve yanıtları @response_handler dekoratörüyle işleyebilir.

  1. Yürütücü ctx.request_info(request_data=request_data, response_type=response_type) çağırır
  2. İş akışı, istek verileriyle bir RequestInfoEvent yayar
  3. Dış sistem (insan, API vb.) isteği işler
  4. Yanıt şu yolla geri gönderilir: send_responses_streaming()
  5. İş akışı devam eder ve yanıtı yürütücü @response_handler yöntemine teslim eder

Ortamı Ayarlama

İlk olarak gerekli paketleri yükleyin:

pip install agent-framework-core --pre
pip install azure-identity

İstek ve Yanıt Modellerini Tanımlama

İstek-yanıt iletişimi için veri yapılarını tanımlayarak başlayın:

import asyncio
from dataclasses import dataclass
from pydantic import BaseModel

from agent_framework import (
    AgentExecutor,
    AgentExecutorRequest,
    AgentExecutorResponse,
    ChatMessage,
    Executor,
    RequestInfoEvent,
    Role,
    WorkflowBuilder,
    WorkflowContext,
    WorkflowOutputEvent,
    WorkflowRunState,
    WorkflowStatusEvent,
    handler,
    response_handler,
)
from agent_framework.azure import AzureOpenAIChatClient
from azure.identity import AzureCliCredential

@dataclass
class HumanFeedbackRequest:
    """Request message for human feedback in the guessing game."""
    prompt: str = ""
    guess: int | None = None

class GuessOutput(BaseModel):
    """Structured output from the AI agent with response_format enforcement."""
    guess: int

HumanFeedbackRequest, yapılandırılmış istek yükleri için basit bir veri sınıfıdır:

  • İstek yükleri için güçlü yazma
  • İleriye uyumlu doğrulama
  • Yanıtlarla ilişki semantiğini netleştirme
  • Zengin kullanıcı arabirimi istemleri için bağlamsal alanlar (önceki tahmin gibi)

Turn Manager'ı oluştur

Dönüş yöneticisi, yapay zeka aracısı ile insan arasındaki akışı koordine eder:

class TurnManager(Executor):
    """Coordinates turns between the AI agent and human player.

    Responsibilities:
    - Start the game by requesting the agent's first guess
    - Process agent responses and request human feedback
    - Handle human feedback and continue the game or finish
    """

    def __init__(self, id: str | None = None):
        super().__init__(id=id or "turn_manager")

    @handler
    async def start(self, _: str, ctx: WorkflowContext[AgentExecutorRequest]) -> None:
        """Start the game by asking the agent for an initial guess."""
        user = ChatMessage(Role.USER, text="Start by making your first guess.")
        await ctx.send_message(AgentExecutorRequest(messages=[user], should_respond=True))

    @handler
    async def on_agent_response(
        self,
        result: AgentExecutorResponse,
        ctx: WorkflowContext,
    ) -> None:
        """Handle the agent's guess and request human guidance."""
        # Parse structured model output (defensive default if agent didn't reply)
        text = result.agent_run_response.text or ""
        last_guess = GuessOutput.model_validate_json(text).guess if text else None

        # Craft a clear human prompt that defines higher/lower relative to agent's guess
        prompt = (
            f"The agent guessed: {last_guess if last_guess is not None else text}. "
            "Type one of: higher (your number is higher than this guess), "
            "lower (your number is lower than this guess), correct, or exit."
        )
        # Send a request using the request_info API
        await ctx.request_info(
            request_data=HumanFeedbackRequest(prompt=prompt, guess=last_guess),
            response_type=str
        )

    @response_handler
    async def on_human_feedback(
        self,
        original_request: HumanFeedbackRequest,
        feedback: str,
        ctx: WorkflowContext[AgentExecutorRequest, str],
    ) -> None:
        """Continue the game or finish based on human feedback."""
        reply = feedback.strip().lower()
        # Use the correlated request's guess to avoid extra state reads
        last_guess = original_request.guess

        if reply == "correct":
            await ctx.yield_output(f"Guessed correctly: {last_guess}")
            return

        # Provide feedback to the agent for the next guess
        user_msg = ChatMessage(
            Role.USER,
            text=f'Feedback: {reply}. Return ONLY a JSON object matching the schema {{"guess": <int 1..10>}}.',
        )
        await ctx.send_message(AgentExecutorRequest(messages=[user_msg], should_respond=True))

İş Akışı Oluşturma

Tüm bileşenleri bağlayan ana iş akışını oluşturun:

async def main() -> None:
    # Create the chat agent with structured output enforcement
    chat_client = AzureOpenAIChatClient(credential=AzureCliCredential())
    agent = chat_client.create_agent(
        instructions=(
            "You guess a number between 1 and 10. "
            "If the user says 'higher' or 'lower', adjust your next guess. "
            'You MUST return ONLY a JSON object exactly matching this schema: {"guess": <integer 1..10>}. '
            "No explanations or additional text."
        ),
        response_format=GuessOutput,
    )

    # Create workflow components
    turn_manager = TurnManager(id="turn_manager")
    agent_exec = AgentExecutor(agent=agent, id="agent")

    # Build the workflow graph
    workflow = (
        WorkflowBuilder()
        .set_start_executor(turn_manager)
        .add_edge(turn_manager, agent_exec)  # Ask agent to make/adjust a guess
        .add_edge(agent_exec, turn_manager)  # Agent's response goes back to coordinator
        .build()
    )

    # Execute the interactive workflow
    await run_interactive_workflow(workflow)

async def run_interactive_workflow(workflow):
    """Run the workflow with human-in-the-loop interaction."""
    pending_responses: dict[str, str] | None = None
    completed = False
    workflow_output: str | None = None

    print("🎯 Number Guessing Game")
    print("Think of a number between 1 and 10, and I'll try to guess it!")
    print("-" * 50)

    while not completed:
        # First iteration uses run_stream("start")
        # Subsequent iterations use send_responses_streaming with pending responses
        stream = (
            workflow.send_responses_streaming(pending_responses)
            if pending_responses
            else workflow.run_stream("start")
        )

        # Collect events for this turn
        events = [event async for event in stream]
        pending_responses = None

        # Process events to collect requests and detect completion
        requests: list[tuple[str, str]] = []  # (request_id, prompt)
        for event in events:
            if isinstance(event, RequestInfoEvent) and isinstance(event.data, HumanFeedbackRequest):
                # RequestInfoEvent for our HumanFeedbackRequest
                requests.append((event.request_id, event.data.prompt))
            elif isinstance(event, WorkflowOutputEvent):
                # Capture workflow output when yielded
                workflow_output = str(event.data)
                completed = True

        # Check workflow status
        pending_status = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IN_PROGRESS_PENDING_REQUESTS
            for e in events
        )
        idle_with_requests = any(
            isinstance(e, WorkflowStatusEvent) and e.state == WorkflowRunState.IDLE_WITH_PENDING_REQUESTS
            for e in events
        )

        if pending_status:
            print("🔄 State: IN_PROGRESS_PENDING_REQUESTS (requests outstanding)")
        if idle_with_requests:
            print("⏸️  State: IDLE_WITH_PENDING_REQUESTS (awaiting human input)")

        # Handle human requests if any
        if requests and not completed:
            responses: dict[str, str] = {}
            for req_id, prompt in requests:
                print(f"\n🤖 {prompt}")
                answer = input("👤 Enter higher/lower/correct/exit: ").lower()

                if answer == "exit":
                    print("👋 Exiting...")
                    return
                responses[req_id] = answer
            pending_responses = responses

    # Show final result
    print(f"\n🎉 {workflow_output}")

Örneği Çalıştırma

Çalışan uygulamanın tamamı için bkz. Döngüde İnsan Tahmin Oyunu örneği.

Nasıl Çalışır?

  1. İş Akışı Başlatma: İş akışı, yapay zeka aracısından TurnManager ilk tahmin istemekle başlar.

  2. Aracı Yanıtı: Yapay zeka aracısı bir tahminde bulunur ve yapılandırılmış JSON'u TurnManager ögesine geri akıtır.

  3. İnsan İsteği: Aracının TurnManager tahminini işler ve ctx.request_info()'yi bir HumanFeedbackRequest ile çağırır.

  4. İş Akışı Duraklatma: İş akışı bir RequestInfoEvent üretir ve başka bir işlem yapılamayana kadar devam eder, ardından insan girişini bekler.

  5. İnsan Yanıtı: Dış uygulama insan girişi toplar ve kullanarak send_responses_streaming() yanıtları geri gönderir.

  6. Sürdür ve Devam Et: İş akışı sürdürülür, TurnManageryöntemi @response_handler insan geri bildirimini işler ve oyunu sonlandırır veya aracıya başka bir istek gönderir.

Önemli Avantajlar

  • Yapılandırılmış İletişim: Tür açısından güvenli istek ve yanıt modelleri çalışma zamanı hatalarını önler
  • Bağıntı: İstek kimlikleri yanıtların doğru isteklerle eşleştiğinden emin olun
  • Duraklatılabilir Yürütme: İş akışları, dış giriş beklerken süresiz olarak duraklatılabilir
  • Durum Koruması: İş akışı durumu duraklatma-sürdürme döngüleri arasında korunur
  • Olay Odaklı: Zengin olay sistemi, iş akışı durumu ve geçişleri için görünürlük sağlar

Bu düzen, yapay zeka aracılarının ve insanların yapılandırılmış iş akışları içinde sorunsuz bir şekilde işbirliği yaptığı gelişmiş etkileşimli uygulamalar oluşturmanızı sağlar.

Sonraki Adımlar