Partager via


Flux de travail Microsoft Agent Framework - Demande et réponse

Cette page fournit une vue d’ensemble du fonctionnement de la gestion des demandes et des réponses dans le système de flux de travail Microsoft Agent Framework.

Aperçu

Les exécuteurs d’un flux de travail peuvent envoyer des demandes en dehors du flux de travail et attendre les réponses. Cela est utile pour les scénarios où un exécuteur doit interagir avec des systèmes externes, tels que des interactions humaines dans la boucle ou d’autres opérations asynchrones.

Activer la gestion des demandes et des réponses dans un flux de travail

Les requêtes et les réponses sont gérées par le biais d’un type spécial appelé InputPort.

// Create an input port that receives requests of type CustomRequestType and responses of type CustomResponseType.
var inputPort = InputPort.Create<CustomRequestType, CustomResponseType>("input-port");

Ajoutez le port d’entrée à un flux de travail.

var executorA = new SomeExecutor();
var workflow = new WorkflowBuilder(inputPort)
    .AddEdge(inputPort, executorA)
    .AddEdge(executorA, inputPort)
    .Build<CustomRequestType>();

Maintenant, étant donné que dans le processus nous avons executorA connecté à inputPort dans les deux directions, executorA doit être en mesure d’envoyer des demandes et de recevoir des réponses via le inputPort. Voici ce que nous devons faire dans SomeExecutor pour envoyer une demande et recevoir une réponse.

internal sealed class SomeExecutor() : Executor<CustomResponseType>("SomeExecutor")
{
    public async ValueTask HandleAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

SomeExecutor Vous pouvez également séparer la gestion des demandes d’envoi et de réponse en deux gestionnaires.

internal sealed class SomeExecutor() : Executor("SomeExecutor")
{
    protected override RouteBuilder ConfigureRoutes(RouteBuilder routeBuilder)
    {
        return routeBuilder
            .AddHandler<CustomResponseType>(this.HandleCustomResponseAsync)
            .AddHandler<OtherDataType>(this.HandleOtherDataAsync);
    }

    public async ValueTask HandleCustomResponseAsync(CustomResponseType message, IWorkflowContext context)
    {
        // Process the response...
        ...
    }

    public async ValueTask HandleOtherDataAsync(OtherDataType message, IWorkflowContext context)
    {
        // Process the message...
        ...
        // Send a request
        await context.SendMessageAsync(new CustomRequestType(...)).ConfigureAwait(false);
    }
}

Les exécuteurs peuvent envoyer des demandes à l’aide ctx.request_info() et gérer des réponses avec @response_handler.

from agent_framework import response_handler, WorkflowBuilder

executor_a = SomeExecutor()
executor_b = SomeOtherExecutor()
workflow_builder = WorkflowBuilder()
workflow_builder.set_start_executor(executor_a)
workflow_builder.add_edge(executor_a, executor_b)
workflow = workflow_builder.build()

executor_a peut envoyer des demandes et recevoir des réponses directement à l’aide de fonctionnalités intégrées.

from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
    response_handler,
)

class SomeExecutor(Executor):

    @handler
    async def handle_data(
        self,
        data: OtherDataType,
        context: WorkflowContext,
    ):
        # Process the message...
        ...
        # Send a request using the API
        await context.request_info(
            request_data=CustomRequestType(...),
            response_type=CustomResponseType
        )

    @response_handler
    async def handle_response(
        self,
        original_request: CustomRequestType,
        response: CustomResponseType,
        context: WorkflowContext,
    ):
        # Process the response...
        ...

Le @response_handler décorateur inscrit automatiquement la méthode pour gérer les réponses pour les types de demande et de réponse spécifiés.

Gestion des demandes et des réponses

Un InputPort émet une RequestInfoEvent fois qu’il reçoit une demande. Vous pouvez vous abonner à ces événements pour gérer les demandes entrantes à partir du flux de travail. Lorsque vous recevez une réponse d’un système externe, renvoyez-la au flux de travail à l’aide du mécanisme de réponse. L’infrastructure achemine automatiquement la réponse vers l’exécuteur qui a envoyé la requête d’origine.

StreamingRun handle = await InProcessExecution.StreamAsync(workflow, input).ConfigureAwait(false);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync().ConfigureAwait(false))
{
    switch (evt)
    {
        case RequestInfoEvent requestInputEvt:
            // Handle `RequestInfoEvent` from the workflow
            ExternalResponse response = requestInputEvt.Request.CreateResponse<CustomResponseType>(...);
            await handle.SendResponseAsync(response).ConfigureAwait(false);
            break;

        case WorkflowOutputEvent workflowOutputEvt:
            // The workflow has completed successfully
            Console.WriteLine($"Workflow completed with result: {workflowOutputEvt.Data}");
            return;
    }
}

Les exécuteurs peuvent envoyer directement des demandes sans avoir besoin d’un composant distinct. Lorsqu’un exécuteur appelle ctx.request_info(), le flux de travail émet un RequestInfoEvent. Vous pouvez vous abonner à ces événements pour gérer les demandes entrantes à partir du flux de travail. Lorsque vous recevez une réponse d’un système externe, renvoyez-la au flux de travail à l’aide du mécanisme de réponse. L’infrastructure achemine automatiquement la réponse à la méthode de l’exécuteur @response_handler .

from agent_framework import RequestInfoEvent

while True:
    request_info_events : list[RequestInfoEvent] = []
    pending_responses : dict[str, CustomResponseType] = {}

    stream = workflow.run_stream(input) if not pending_responses else workflow.send_responses_streaming(pending_responses)

    async for event in stream:
        if isinstance(event, RequestInfoEvent):
            # Handle `RequestInfoEvent` from the workflow
            request_info_events.append(event)

    if not request_info_events:
        break

    for request_info_event in request_info_events:
        # Handle `RequestInfoEvent` from the workflow
        response = CustomResponseType(...)
        pending_responses[request_info_event.request_id] = response

Points de contrôle et demandes

Pour en savoir plus sur les points de contrôle, reportez-vous à cette page.

Lorsqu’un point de contrôle est créé, les demandes en attente sont également enregistrées dans le cadre de l’état du point de contrôle. Lorsque vous effectuez une restauration à partir d’un point de contrôle, toutes les demandes en attente sont recrétées en tant qu’objets RequestInfoEvent , ce qui vous permet de capturer et de répondre à ces demandes. Vous ne pouvez pas fournir de réponses directement pendant l’opération de reprise. Au lieu de cela, vous devez écouter les événements re-émis et répondre à l’aide du mécanisme de réponse standard.

Étapes suivantes