Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Ce guide vous aide à mettre à niveau vos flux de travail Python vers les dernières modifications d’API introduites dans la version 1.0.0b251104.
Vue d’ensemble des modifications
Cette version comprend deux améliorations majeures apportées au système de flux de travail :
1. API d’exécution de flux de travail consolidées
Les méthodes d’exécution du flux de travail ont été unifiées par souci de simplicité :
-
méthodes unifiées
run_stream()etrun(): Remplacer les méthodes séparées spécifiques aux points de contrôle (run_stream_from_checkpoint(),run_from_checkpoint()) -
Interface unique : utiliser un
checkpoint_idparamètre pour reprendre à partir de points de contrôle au lieu de méthodes distinctes - Point de contrôle flexible : configurer le stockage du point de contrôle au moment de la génération ou le remplacer au moment de l’exécution
-
Sémantique plus claire : Paramètres mutuellement exclusifs
message(nouvelle exécution) etcheckpoint_id(resume)
2. Système de Request-Response simplifié
Le système de demande-réponse a été simplifié :
-
Fini les
RequestInfoExecutor: les exécuteurs peuvent désormais envoyer des demandes directement -
Nouveau
@response_handlerdécorateur : Remplacement des gestionnaires de messagesRequestResponse -
Types de demandes simplifiés : aucun héritage de
RequestInfoMessageobligatoire - Fonctionnalités intégrées : tous les exécuteurs prennent automatiquement en charge les fonctionnalités de demande-réponse
-
Graphiques de flux de travail plus propres : supprimer des
RequestInfoExecutornœuds de vos flux de travail
Partie 1 : API d’exécution de flux de travail unifié
Nous vous recommandons d’abord de migrer vers les API de flux de travail consolidées, car cela constitue la base de tous les modèles d’exécution de flux de travail.
Reprise à partir de points de contrôle
Avant (ancienne API) :
# OLD: Separate method for checkpoint resume
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
):
print(f"Event: {event}")
Après (nouvelle API) :
# NEW: Unified method with checkpoint_id parameter
async for event in workflow.run_stream(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
):
print(f"Event: {event}")
Principales différences :
- Utiliser un
checkpoint_idparamètre au lieu d’une méthode distincte - Impossible de fournir à la fois
messageetcheckpoint_id(mutuellement exclusif) - Doit fournir
message(nouvelle exécution) oucheckpoint_id(reprendre) -
checkpoint_storageest facultatif si le checkpoint a été configuré lors de la compilation
API non streaming
La méthode non de diffusion en continu run() suit le même modèle :
Vieux:
result = await workflow.run_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage
)
Nouveau:
result = await workflow.run(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage # Optional if configured at build time
)
Reprise du point de contrôle avec des demandes en attente
Changement majeur important : lors de la reprise depuis un point de contrôle qui contient des objets en attente RequestInfoEvent, la nouvelle API émet automatiquement ces événements. Vous devez les capturer et y répondre.
Avant (ancien comportement) :
# OLD: Could provide responses directly during resume
responses = {
"request-id-1": "user response data",
"request-id-2": "another response"
}
async for event in workflow.run_stream_from_checkpoint(
checkpoint_id="checkpoint-id",
checkpoint_storage=checkpoint_storage,
responses=responses # No longer supported
):
print(f"Event: {event}")
Après (nouveau comportement) :
# NEW: Capture re-emitted pending requests
requests: dict[str, Any] = {}
async for event in workflow.run_stream(checkpoint_id="checkpoint-id"):
if event.type == "request_info":
# Pending requests are automatically re-emitted
print(f"Pending request re-emitted: {event.request_id}")
requests[event.request_id] = event.data
# Collect user responses
responses: dict[str, Any] = {}
for request_id, request_data in requests.items():
response = handle_request(request_data) # Your logic here
responses[request_id] = response
# Send responses back to workflow
async for event in workflow.send_responses_streaming(responses):
if event.type == "output":
print(f"Workflow output: {event.data}")
Exemple complet d’human-in-the-loop
Voici un exemple complet montrant la reprise du point de contrôle avec l’approbation humaine en attente :
from agent_framework import (
Executor,
FileCheckpointStorage,
WorkflowBuilder,
handler,
response_handler,
)
# ... (Executor definitions omitted for brevity)
async def run_interactive_session(
workflow: Workflow,
initial_message: str | None = None,
checkpoint_id: str | None = None,
) -> str:
"""Run workflow until completion, handling human input interactively."""
requests: dict[str, HumanApprovalRequest] = {}
responses: dict[str, str] | None = None
completed_output: str | None = None
while True:
# Determine which API to call
if responses:
# Send responses from previous iteration
event_stream = workflow.send_responses_streaming(responses)
requests.clear()
responses = None
else:
# Start new run or resume from checkpoint
if initial_message:
event_stream = workflow.run_stream(initial_message)
elif checkpoint_id:
event_stream = workflow.run_stream(checkpoint_id=checkpoint_id)
else:
raise ValueError("Either initial_message or checkpoint_id required")
# Process events
async for event in event_stream:
if event.type == "status":
print(event)
if event.type == "output":
completed_output = event.data
if event.type == "request_info":
if isinstance(event.data, HumanApprovalRequest):
requests[event.request_id] = event.data
# Check completion
if completed_output:
break
# Prompt for user input if we have pending requests
if requests:
responses = prompt_for_responses(requests)
continue
raise RuntimeError("Workflow stopped without completing or requesting input")
return completed_output
Partie 2 : Système de Request-Response simplifié
Après avoir migré vers les API de flux de travail unifiées, mettez à jour vos modèles de demande-réponse pour utiliser le nouveau système intégré.
1. Mettre à jour les importations
Before:
from agent_framework import (
RequestInfoExecutor,
RequestInfoMessage,
RequestResponse,
# ... other imports
)
After:
from agent_framework import (
response_handler,
# ... other imports
# Remove: RequestInfoExecutor, RequestInfoMessage, RequestResponse
)
2. Mettre à jour les types de demandes
Before:
from dataclasses import dataclass
from agent_framework import RequestInfoMessage
@dataclass
class UserApprovalRequest(RequestInfoMessage):
"""Request for user approval."""
prompt: str = ""
context: str = ""
After:
from dataclasses import dataclass
@dataclass
class UserApprovalRequest:
"""Request for user approval."""
prompt: str = ""
context: str = ""
3. Mettre à jour le graphe de flux de travail
Before:
# Old pattern: Required RequestInfoExecutor in workflow
approval_executor = ApprovalRequiredExecutor(id="approval")
request_info_executor = RequestInfoExecutor(id="request_info")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.add_edge(approval_executor, request_info_executor)
.add_edge(request_info_executor, approval_executor)
.build()
)
After:
# New pattern: Direct request-response capabilities
approval_executor = ApprovalRequiredExecutor(id="approval")
workflow = (
WorkflowBuilder(start_executor=approval_executor)
.build()
)
4. Mettre à jour l’envoi de la demande
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext[UserApprovalRequest]) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.send_message(request)
After:
class ApprovalRequiredExecutor(Executor):
@handler
async def process(self, message: str, ctx: WorkflowContext) -> None:
request = UserApprovalRequest(
prompt=f"Please approve: {message}",
context="Important operation"
)
await ctx.request_info(request_data=request, response_type=bool)
5. Mise à jour de la gestion des réponses
Before:
class ApprovalRequiredExecutor(Executor):
@handler
async def handle_approval(
self,
response: RequestResponse[UserApprovalRequest, bool],
ctx: WorkflowContext[Never, str]
) -> None:
if response.data:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
After:
class ApprovalRequiredExecutor(Executor):
@response_handler
async def handle_approval(
self,
original_request: UserApprovalRequest,
approved: bool,
ctx: WorkflowContext
) -> None:
if approved:
await ctx.yield_output("Approved!")
else:
await ctx.yield_output("Rejected!")
Résumé des avantages
API de flux de travail unifié
- Interface simplifiée : méthode unique pour les exécutions initiales et la reprise du point de contrôle
- Sémantique plus claire : les paramètres mutuellement exclusifs rendent l’intention explicite
- Point de contrôle flexible : configurer au moment de la compilation ou remplacer au moment de l’exécution
- Charge cognitive réduite : moins de méthodes à mémoriser et à gérer
système Request-Response
-
Architecture simplifiée : Aucun besoin de composants distincts
RequestInfoExecutor -
Sécurité des types : spécification directe du type dans les appels
request_info() - Code plus propre : moins d’importations et graphiques de flux de travail plus simples
- Meilleures performances : réduction de la surcharge de routage des messages
- Débogage amélioré : flux d’exécution plus clair et gestion des erreurs
Test de votre migration
Liste de contrôle de la partie 1 : API de flux de travail
-
Mettre à jour les appels d’API : remplacer par
run_stream_from_checkpoint()run_stream(checkpoint_id=...) -
Mettre à jour les appels d’API : remplacer par
run_from_checkpoint()run(checkpoint_id=...) -
Supprimer
responsesle paramètre : supprimer tous lesresponsesarguments des appels de reprise de point de contrôle -
Ajouter une capture d’événements : Implémenter la logique pour capturer les événements request_info réémis (
event.type == "request_info") - Reprise du point de contrôle de test : vérifier que les demandes en attente sont réémises et gérées correctement
Liste de contrôle de la partie 2 : système Request-Response
-
Vérifier les importations : vérifiez qu’aucune ancienne importation ne reste (
RequestInfoExecutor,RequestInfoMessage,RequestResponse) -
Vérifier les types de demandes : confirmer la suppression de l’héritage
RequestInfoMessage -
Graphique de flux de travail de test : vérifier la suppression des
RequestInfoExecutornœuds -
Valider les manipulateurs : assurez-vous que
@response_handlerles décorateurs sont appliqués - Tester de bout en bout : exécuter des scénarios de workflow complets
Étapes suivantes
Une fois la migration terminée :
- Consultez le didacticiel demandes et réponses mis à jour
- Explorer les modèles avancés dans le Guide de l’utilisateur
- Consultez les exemples mis à jour dans le référentiel
Pour obtenir de l’aide supplémentaire, reportez-vous à la documentation agent Framework ou contactez l’équipe et la communauté.