Partager via


Guide de mise à niveau : API de flux de travail et système Request-Response

Ce guide vous aide à mettre à niveau vos flux de travail Python vers les dernières modifications d’API introduites dans la version 1.0.0b251104.

Vue d’ensemble des modifications

Cette version comprend deux améliorations majeures apportées au système de flux de travail :

1. API d’exécution de flux de travail consolidées

Les méthodes d’exécution du flux de travail ont été unifiées par souci de simplicité :

  • méthodes unifiées run_stream() et run() : Remplacer les méthodes séparées spécifiques aux points de contrôle (run_stream_from_checkpoint(), run_from_checkpoint())
  • Interface unique : utiliser un checkpoint_id paramètre pour reprendre à partir de points de contrôle au lieu de méthodes distinctes
  • Point de contrôle flexible : configurer le stockage du point de contrôle au moment de la génération ou le remplacer au moment de l’exécution
  • Sémantique plus claire : Paramètres mutuellement exclusifs message (nouvelle exécution) et checkpoint_id (resume)

2. Système de Request-Response simplifié

Le système de demande-réponse a été simplifié :

  • Fini les RequestInfoExecutor : les exécuteurs peuvent désormais envoyer des demandes directement
  • Nouveau @response_handler décorateur : Remplacement des gestionnaires de messages RequestResponse
  • Types de demandes simplifiés : aucun héritage de RequestInfoMessage obligatoire
  • Fonctionnalités intégrées : tous les exécuteurs prennent automatiquement en charge les fonctionnalités de demande-réponse
  • Graphiques de flux de travail plus propres : supprimer des RequestInfoExecutor nœuds de vos flux de travail

Partie 1 : API d’exécution de flux de travail unifié

Nous vous recommandons d’abord de migrer vers les API de flux de travail consolidées, car cela constitue la base de tous les modèles d’exécution de flux de travail.

Reprise à partir de points de contrôle

Avant (ancienne API) :

# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
):
    print(f"Event: {event}")

Après (nouvelle API) :

# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
):
    print(f"Event: {event}")

Principales différences :

  • Utiliser un checkpoint_id paramètre au lieu d’une méthode distincte
  • Impossible de fournir à la fois message et checkpoint_id (mutuellement exclusif)
  • Doit fournir message (nouvelle exécution) ou checkpoint_id (reprendre)
  • checkpoint_storage est facultatif si le checkpoint a été configuré lors de la compilation

API non streaming

La méthode non de diffusion en continu run() suit le même modèle :

Vieux:

result = await workflow.run_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage
)

Nouveau:

result = await workflow.run(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage  # Optional if configured at build time
)

Reprise du point de contrôle avec des demandes en attente

Changement majeur important : lors de la reprise depuis un point de contrôle qui contient des objets en attente RequestInfoEvent, la nouvelle API émet automatiquement ces événements. Vous devez les capturer et y répondre.

Avant (ancien comportement) :

# OLD: Could provide responses directly during resume
responses = {
    "request-id-1": "user response data",
    "request-id-2": "another response"
}

async for event in workflow.run_stream_from_checkpoint(
    checkpoint_id="checkpoint-id",
    checkpoint_storage=checkpoint_storage,
    responses=responses  # No longer supported
):
    print(f"Event: {event}")

Après (nouveau comportement) :

# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}

async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
    if event.type == "request_info":
        # Pending requests are automatically re-emitted
        print(f"Pending request re-emitted: {event.request_id}")
        requests[event.request_id] = event.data

# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
    response = handle_request(request_data)  # Your logic here
    responses[request_id] = response

# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
    if event.type == "output":
        print(f"Workflow output: {event.data}")

Exemple complet d’human-in-the-loop

Voici un exemple complet montrant la reprise du point de contrôle avec l’approbation humaine en attente :

from agent_framework import (
    Executor,
    FileCheckpointStorage,
    WorkflowBuilder,
    handler,
    response_handler,
)

# ... (Executor definitions omitted for brevity)

async def run_interactive_session(
    workflow: Workflow,
    initial_message: str | None = None,
    checkpoint_id: str | None = None,
) -> str:
    """Run workflow until completion, handling human input interactively."""

    requests: dict[str, HumanApprovalRequest] = {}
    responses: dict[str, str] | None = None
    completed_output: str | None = None

    while True:
        # Determine which API to call
        if responses:
            # Send responses from previous iteration
            event_stream = workflow.send_responses_streaming(responses)
            requests.clear()
            responses = None
        else:
            # Start new run or resume from checkpoint
            if initial_message:
                event_stream = workflow.run_stream(initial_message)
            elif checkpoint_id:
                event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
            else:
                raise ValueError("Either initial_message or checkpoint_id required")

        # Process events
        async for event in event_stream:
            if event.type == "status":
                print(event)
            if event.type == "output":
                completed_output = event.data
            if event.type == "request_info":
                if isinstance(event.data, HumanApprovalRequest):
                    requests[event.request_id] = event.data

        # Check completion
        if completed_output:
            break

        # Prompt for user input if we have pending requests
        if requests:
            responses = prompt_for_responses(requests)
            continue

        raise RuntimeError("Workflow stopped without completing or requesting input")

    return completed_output

Partie 2 : Système de Request-Response simplifié

Après avoir migré vers les API de flux de travail unifiées, mettez à jour vos modèles de demande-réponse pour utiliser le nouveau système intégré.

1. Mettre à jour les importations

Before:

from agent_framework import (
    RequestInfoExecutor,
    RequestInfoMessage,
    RequestResponse,
    # ... other imports
)

After:

from agent_framework import (
    response_handler,
    # ... other imports
    # Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)

2. Mettre à jour les types de demandes

Before:

from dataclasses import dataclass
from agent_framework import RequestInfoMessage

@dataclass
class UserApprovalRequest(RequestInfoMessage):
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

After:

from dataclasses import dataclass

@dataclass
class UserApprovalRequest:
    """Request for user approval."""
    prompt: str = ""
    context: str = ""

3. Mettre à jour le graphe de flux de travail

Before:

# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .add_edge(approval_executor, request_info_executor)
    .add_edge(request_info_executor, approval_executor)
    .build()
)

After:

# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")

workflow = (
    WorkflowBuilder(start_executor=approval_executor)
    .build()
)

4. Mettre à jour l’envoi de la demande

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.send_message(request)

After:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def process(self, message: str, ctx: WorkflowContext) -> None:
        request = UserApprovalRequest(
            prompt=f"Please approve: {message}",
            context="Important operation"
        )
        await ctx.request_info(request_data=request, response_type=bool)

5. Mise à jour de la gestion des réponses

Before:

class ApprovalRequiredExecutor(Executor):
    @handler
    async def handle_approval(
        self,
        response: RequestResponse[UserApprovalRequest, bool],
        ctx: WorkflowContext[Never, str]
    ) -> None:
        if response.data:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

After:

class ApprovalRequiredExecutor(Executor):
    @response_handler
    async def handle_approval(
        self,
        original_request: UserApprovalRequest,
        approved: bool,
        ctx: WorkflowContext
    ) -> None:
        if approved:
            await ctx.yield_output("Approved!")
        else:
            await ctx.yield_output("Rejected!")

Résumé des avantages

API de flux de travail unifié

  1. Interface simplifiée : méthode unique pour les exécutions initiales et la reprise du point de contrôle
  2. Sémantique plus claire : les paramètres mutuellement exclusifs rendent l’intention explicite
  3. Point de contrôle flexible : configurer au moment de la compilation ou remplacer au moment de l’exécution
  4. Charge cognitive réduite : moins de méthodes à mémoriser et à gérer

système Request-Response

  1. Architecture simplifiée : Aucun besoin de composants distincts RequestInfoExecutor
  2. Sécurité des types : spécification directe du type dans les appels request_info()
  3. Code plus propre : moins d’importations et graphiques de flux de travail plus simples
  4. Meilleures performances : réduction de la surcharge de routage des messages
  5. Débogage amélioré : flux d’exécution plus clair et gestion des erreurs

Test de votre migration

Liste de contrôle de la partie 1 : API de flux de travail

  1. Mettre à jour les appels d’API : remplacer par run_stream_from_checkpoint()run_stream(checkpoint_id=...)
  2. Mettre à jour les appels d’API : remplacer par run_from_checkpoint()run(checkpoint_id=...)
  3. Supprimer responses le paramètre : supprimer tous les responses arguments des appels de reprise de point de contrôle
  4. Ajouter une capture d’événements : Implémenter la logique pour capturer les événements request_info réémis (event.type == "request_info")
  5. Reprise du point de contrôle de test : vérifier que les demandes en attente sont réémises et gérées correctement

Liste de contrôle de la partie 2 : système Request-Response

  1. Vérifier les importations : vérifiez qu’aucune ancienne importation ne reste (RequestInfoExecutor, RequestInfoMessage, RequestResponse)
  2. Vérifier les types de demandes : confirmer la suppression de l’héritage RequestInfoMessage
  3. Graphique de flux de travail de test : vérifier la suppression des RequestInfoExecutor nœuds
  4. Valider les manipulateurs : assurez-vous que @response_handler les décorateurs sont appliqués
  5. Tester de bout en bout : exécuter des scénarios de workflow complets

Étapes suivantes

Une fois la migration terminée :

  1. Consultez le didacticiel demandes et réponses mis à jour
  2. Explorer les modèles avancés dans le Guide de l’utilisateur
  3. Consultez les exemples mis à jour dans le référentiel

Pour obtenir de l’aide supplémentaire, reportez-vous à la documentation agent Framework ou contactez l’équipe et la communauté.