Remarque
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de vous connecter ou de modifier des répertoires.
L’accès à cette page nécessite une autorisation. Vous pouvez essayer de modifier des répertoires.
Un sous-flux de travail est un flux de travail complet qui s’exécute en tant qu’exécuteur au sein d’un flux de travail parent. Cela vous permet de composer des systèmes complexes à partir de blocs de construction de flux de travail plus petits et réutilisables, chacun avec son propre contexte d’exécution isolé, la gestion de l’état et le routage des messages.
Aperçu
Les sous-flux de travail sont utiles lorsque vous souhaitez :
- Décomposition de la complexité : décomposez un flux de travail volumineux en unités testables plus petites et indépendantes.
- Réutiliser la logique de flux de travail : incorporez le même sous-flux de travail dans plusieurs flux de travail parents.
- Isoler l’état : conservez l’état interne de chaque sous-flux de travail distinct du parent.
- Contrôler le flux de données : les messages entrent et quittent le sous-flux de travail uniquement par ses bords, sans diffusion à différents niveaux.
Lorsqu’un sous-flux de travail est ajouté à un flux de travail parent, il se comporte comme n’importe quel autre exécuteur : il reçoit des messages d’entrée, exécute son graphe interne à l’achèvement et produit des messages de sortie pour les exécuteurs en aval.
Création d’un Sub-Workflow
En C#, vous composez des sous-workflows de deux façons :
-
Liaison directe : permet
BindAsExecutor()d’incorporer un flux de travail directement en tant qu’exécuteur dans le flux de travail parent. Cela conserve les types d’entrée/sortie natifs du sous-flux de travail. -
Habillage de l’agent : permet
AsAIAgent()de convertir un flux de travail en agent, puis d’ajouter l’agent au flux de travail parent. Cela est utile lorsque le flux de travail parent utilise des exécuteurs basés sur l’agent.
Liaison directe avec BindAsExecutor
La BindAsExecutor() méthode d’extension convertit un flux de travail en un ExecutorBinding qui peut être ajouté directement à un flux de travail parent :
using Microsoft.Agents.AI.Workflows;
// Create executors for the inner workflow
UppercaseExecutor uppercase = new();
ReverseExecutor reverse = new();
AppendSuffixExecutor append = new(" [PROCESSED]");
// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(uppercase)
.AddEdge(uppercase, reverse)
.AddEdge(reverse, append)
.WithOutputFrom(append)
.Build();
// Bind the inner workflow as an executor
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("TextProcessingSubWorkflow");
// Build the parent workflow using the sub-workflow executor
PrefixExecutor prefix = new("INPUT: ");
PostProcessExecutor postProcess = new();
var parentWorkflow = new WorkflowBuilder(prefix)
.AddEdge(prefix, subWorkflowExecutor)
.AddEdge(subWorkflowExecutor, postProcess)
.WithOutputFrom(postProcess)
.Build();
Avec BindAsExecutor, les types d’entrée et de sortie typés du sous-flux de travail sont conservés : le flux de travail parent achemine les messages en fonction des types réels attendus et produits par le sous-flux de travail.
Enveloppement de l’agent avec AsAIAgent
Lorsque le flux de travail parent utilise des exécuteurs basés sur un agent, convertissez le flux de travail interne en agent à l’aide de AsAIAgent(). L’agent WorkflowBuilder est automatiquement encapsulé dans un exécuteur :
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
// Create agents for the inner workflow
AIAgent specialist1 = chatClient.AsAIAgent("You are specialist 1. Analyze the data.");
AIAgent specialist2 = chatClient.AsAIAgent("You are specialist 2. Validate the analysis.");
// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(specialist1)
.AddEdge(specialist1, specialist2)
.Build();
// Convert the inner workflow to an agent
AIAgent innerWorkflowAgent = innerWorkflow.AsAIAgent(
id: "analysis-pipeline",
name: "Analysis Pipeline",
description: "A sub-workflow that analyzes and validates data"
);
// Create agents for the parent workflow
AIAgent coordinator = chatClient.AsAIAgent("You are a coordinator. Delegate tasks to the team.");
AIAgent reviewer = chatClient.AsAIAgent("You are a reviewer. Review the final output.");
// Build the parent workflow with the sub-workflow
var parentWorkflow = new WorkflowBuilder(coordinator)
.AddEdge(coordinator, innerWorkflowAgent)
.AddEdge(innerWorkflowAgent, reviewer)
.Build();
Le flux de travail interne s’exécute en une seule étape du point de vue du flux de travail parent. Le coordinateur envoie des messages au pipeline d’analyse, qui s’exécute specialist1 → specialist2en interne, puis transfère le résultat au réviseur.
Conseil / Astuce
Utilisez-la BindAsExecutor() lors de l’utilisation d’exécuteurs typés et AsAIAgent() lors de l’utilisation de flux de travail basés sur un agent. Pour plus d’informations sur la configuration de la conversion de flux de travail en agent, consultez Flux de travail en tant qu’agents.
Types d’entrée et de sortie
Lorsqu’un flux de travail est utilisé comme sous-flux de travail, il conserve les contrats de type de ses exécuteurs internes.
Avec BindAsExecutor, l’exécuteur de sous-flux de travail accepte les mêmes types d’entrée que l’exécuteur de démarrage du flux de travail interne et envoie les mêmes types de sortie que le flux de travail interne produit. Les arêtes du flux de travail parent doivent connecter des exécuteurs dont les types de sortie correspondent aux types d’entrée attendus du sous-flux de travail, et les types de sortie du sous-flux de travail doivent correspondre aux entrées attendues des exécuteurs en aval.
Avec AsAIAgent, le sous-flux de travail est encapsulé en tant qu'agent et suit les contrats d’entrée/sortie de l'Agent Executor (string, ChatMessage, IEnumerable<ChatMessage>).
Comportement de sortie
Par défaut, lorsqu’un sous-flux de travail produit des sorties (via YieldOutputAsync), ces sorties sont transférées en tant que messages aux exécuteurs connectés dans le flux de travail parent. Cela permet aux exécuteurs en aval de traiter les résultats des sous-flux de travail.
La ExecutorOptions classe contrôle ce comportement :
| Choix | Par défaut | Description |
|---|---|---|
AutoSendMessageHandlerResultObject |
true |
Transférer les sorties de sous-flux de travail en tant que messages aux exécuteurs connectés dans le graphique parent. |
AutoYieldOutputHandlerResultObject |
false |
Déverser les sorties des sous-flux de travail directement dans le flux d'événements de sortie du flux de travail parent. |
LorsquAutoYieldOutputHandlerResultObject est activé, les sorties des sous-flux de travail contournent le routage interne du parent et sont remises directement à l'appelant du flux de travail parent.
var options = new ExecutorOptions
{
AutoYieldOutputHandlerResultObject = true,
};
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("SubWorkflow", options);
Demandes et réponses
Les sous-flux de travail prennent entièrement en charge le mécanisme de demande et de réponse . Lorsqu'un exécuteur à l'intérieur du sous-flux de travail envoie une demande (par exemple, pour demander une entrée humaine), le WorkflowHostExecutor la transmet au RequestInfoEvent flux de travaux parent avec un ID de port qualifié : l'ID de l'exécuteur de sous-flux de travail est ajouté à l'ID de port (par exemple, SubWorkflow.GuessNumber).
Cette qualification garantit que lorsque le flux de travail parent reçoit une réponse, il peut router la réponse vers l’instance de sous-flux de travail correcte. Le flux de travail parent gère les demandes de sous-flux de travail à l’aide du même mécanisme de réponse que toute autre requête :
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(parentWorkflow, input);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvt:
// The request may originate from the sub-workflow
// Handle it and send the response back
var response = requestInfoEvt.Request.CreateResponse(myResponseData);
await handle.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Output: {outputEvt.Data}");
break;
}
}
Note
Du point de vue de l’appelant de flux de travail parent, il n’existe aucune différence entre une requête d’un exécuteur de niveau supérieur et une requête d’un sous-flux de travail. Le cadre gère le routage de manière transparente.
Fonctionnement
Lorsque le flux de travail parent achemine un message vers l’exécuteur de sous-flux de travail :
-
Remise d’entrée : le message est transféré à l’exécuteur de démarrage du flux de travail interne. Avec
BindAsExecutor, le type de message doit correspondre aux types attendus de l’exécuteur de démarrage. AvecAsAIAgent, les messages sont normalisés auChatMessageformat. - Exécution interne : le flux de travail interne exécute sa propre boucle superstep.
-
Collection de sorties : les événements de sortie du flux de travail interne sont collectés. Avec
BindAsExecutor, les sorties conservent leurs types d’origine. AvecAsAIAgent, les sorties sont converties en messages de réponse de l’agent. - Transfert de requête : si le flux de travail interne a des demandes en attente, ils sont transférés au flux de travail parent pour la gestion (voir Demandes et réponses).
- Répartition en aval : les messages résultants sont envoyés au exécuteur suivant dans le flux de travail parent.
Étant donné que le flux de travail interne conserve son propre contexte d’exécution, son état est indépendant du flux de travail parent.
Conseil / Astuce
Pour plus d’informations sur la configuration de la conversion de flux de travail en agent, y compris le comportement de diffusion en continu et la gestion des exceptions, consultez Flux de travail en tant qu’agents.
Imbrication à plusieurs niveaux
Les sous-flux de travail peuvent être imbriqués à une profondeur arbitraire. Chaque niveau conserve son propre contexte d’exécution :
// Level 1: Data preparation pipeline
var dataPipeline = new WorkflowBuilder(fetcher)
.AddEdge(fetcher, cleaner)
.Build();
AIAgent dataPipelineAgent = dataPipeline.AsAIAgent(
id: "data-pipeline",
name: "Data Pipeline"
);
// Level 2: Analysis pipeline (contains the data pipeline)
var analysisPipeline = new WorkflowBuilder(dataPipelineAgent)
.AddEdge(dataPipelineAgent, analyzer)
.Build();
AIAgent analysisPipelineAgent = analysisPipeline.AsAIAgent(
id: "analysis-pipeline",
name: "Analysis Pipeline"
);
// Level 3: Top-level orchestration
var topWorkflow = new WorkflowBuilder(coordinator)
.AddEdge(coordinator, analysisPipelineAgent)
.AddEdge(analysisPipelineAgent, reporter)
.Build();
Note
Chaque niveau d’imbrication ajoute une surcharge d’exécution, car le flux de travail interne exécute sa propre boucle de superétapes. Gardez la profondeur d’imbrication raisonnable pour les scénarios sensibles à la performance.
Gestion des erreurs
En cas d’échec d’un sous-flux de travail, l’erreur est propagée au flux de travail parent en tant que SubworkflowErrorEvent. Le flux de travail parent peut observer ces erreurs par le biais de son flux d’événements :
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
if (evt is SubworkflowErrorEvent subError)
{
Console.WriteLine($"Sub-workflow '{subError.ExecutorId}' failed: {subError.Data}");
}
}
Si le sous-flux de travail rencontre une exception non gérée, l’exécution du flux de travail parent se poursuit, mais l’exécuteur de sous-flux de travail cesse de traiter d’autres messages.
Points de contrôle
Lorsqu’un point de contrôle est effectué sur le flux de travail parent, l’état de session de l’agent de sous-flux de travail est sérialisé dans le cadre des données de point de contrôle de l’exécuteur parent. Lors de la restauration, l’état de session est désérialisé, ce qui permet au flux de travail parent de reprendre avec l’état du sous-flux de travail intact.
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the parent workflow with checkpointing
StreamingRun run = await InProcessExecution
.RunStreamingAsync(parentWorkflow, input, checkpointManager);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// Process events, including those from sub-workflows
}
// Resume from a checkpoint
CheckpointInfo checkpoint = run.Checkpoints[^1];
StreamingRun resumedRun = await InProcessExecution
.ResumeStreamingAsync(parentWorkflow, checkpoint, checkpointManager);
Création d’un Sub-Workflow
En Python, vous créez un sous-flux de travail en encapsulant un Workflow dans un WorkflowExecutor flux de travail parent et en l’ajoutant à un flux de travail parent.
from agent_framework import WorkflowBuilder, WorkflowExecutor
# Create agents for the inner workflow
specialist1 = client.as_agent(name="Specialist1", instructions="Analyze the data.")
specialist2 = client.as_agent(name="Specialist2", instructions="Validate the analysis.")
# Build the inner workflow
inner_workflow = (
WorkflowBuilder(start_executor=specialist1)
.add_edge(specialist1, specialist2)
.build()
)
# Wrap as an executor
inner_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
)
# Create agents for the parent workflow
coordinator = client.as_agent(name="Coordinator", instructions="Delegate tasks to the team.")
reviewer = client.as_agent(name="Reviewer", instructions="Review the final output.")
# Build the parent workflow with the sub-workflow
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
Le flux de travail interne s’exécute en une seule étape du point de vue du flux de travail parent. Le coordinateur envoie des messages au pipeline d’analyse, qui s’exécute specialist1 → specialist2en interne, puis transfère le résultat au réviseur.
Paramètres workflowExecutor
| Paramètre | Type | Par défaut | Description |
|---|---|---|---|
workflow |
Workflow |
— | Instance de workflow à encapsuler en tant qu’exécuteur. |
id |
str |
— | Identificateur unique pour cet exécuteur. |
allow_direct_output |
bool |
False |
Lorsque True, les résultats du sous-flux de travail sont transmis directement au flux d’événements du flux de travail parent au lieu d’être envoyés en tant que messages aux exécuteurs connectés. |
propagate_request |
bool |
False |
Lorsque True, les requêtes du sous-workflow sont propagées au flux d'événements du workflow parent en tant qu'événements de requête standard. Lorsque False, les requêtes sont encapsulées SubWorkflowRequestMessage pour l’interception par les exécuteurs parents. |
Enveloppement implicite vs explicite
Les instances Workflow peuvent être automatiquement enveloppées par WorkflowBuilder dans un WorkflowExecutor lorsque vous les passez directement. Cela est similaire à la façon dont Agent les instances sont automatiquement encapsulées AgentExecutor.
# Implicit wrapping — WorkflowBuilder detects the Workflow and wraps it
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow) # Workflow auto-wrapped
.add_edge(inner_workflow, reviewer)
.build()
)
Utilisez l'enveloppement explicite lorsque vous en avez besoin.
- Attribuez un ID d’exécuteur spécifique pour référence dans plusieurs arêtes.
- Réutilisez la même
WorkflowExecutorinstance dans le graphique.
# Explicit wrapping — create the WorkflowExecutor yourself
inner_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
)
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
Types d’entrée et de sortie
Le WorkflowExecutor hérite de la signature de type du flux de travail encapsulé :
-
Les types d’entrée correspondent aux types d’entrée de l’exécuteur de démarrage du flux de travail encapsulé (plus
SubWorkflowResponseMessagepour la gestion des réponses aux demandes transférées). -
Les types de sortie correspondent aux types de sortie du flux de travail encapsulé. Si un exécuteur dans le sous-flux de travail est capable de répondre aux demandes,
SubWorkflowRequestMessageil est également inclus en tant que type de sortie.
Cela signifie que les arêtes du flux de travail parent doivent connecter des exécuteurs dont les types de sortie correspondent aux types d’entrée attendus du sous-workflow. De même, les exécuteurs en aval doivent accepter les types générés par le sous-flux de travail :
# The sub-workflow's start executor accepts TextProcessingRequest
# So the parent executor must send TextProcessingRequest
class Orchestrator(Executor):
@handler
async def start(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
for text in texts:
await ctx.send_message(TextProcessingRequest(text=text))
# The sub-workflow yields TextProcessingResult
# So the downstream executor must handle TextProcessingResult
class ResultCollector(Executor):
@handler
async def collect(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
print(f"Received: {result}")
Comportement de sortie
Par défaut (allow_direct_output=False), lorsqu’un sous-flux de travail produit des sorties via yield_output, ces sorties sont transférées en tant que messages aux exécuteurs connectés dans le flux de travail parent à l’aide send_messagede . Cela permet aux exécuteurs en aval de traiter les résultats de sous-flux de travail dans le cadre du graphique parent.
Lorsque allow_direct_output=True, les sorties des sous-flux de travail sont directement générées dans le flux d’événements du flux de travail parent. Les sorties du sous-flux de travail deviennent des sorties du flux de travail parent, en contournant le routage de l’exécuteur interne du parent :
# Outputs go directly to parent's event stream
sub_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
allow_direct_output=True,
)
# The caller receives sub-workflow outputs directly
async for event in parent_workflow.run(input_data, stream=True):
if event.type == "output":
# This output came from the sub-workflow
print(event.data)
Demandes et réponses
Les sous-flux de travail prennent entièrement en charge le mécanisme de demande et de réponse . Lorsqu’un exécuteur situé à l’intérieur d’un sous-flux de travail appelle ctx.request_info(), le WorkflowExecutor intercepte la demande et la gère en fonction du paramètre propagate_request.
Interception des requêtes dans le flux de travail parent (par défaut)
Avec propagate_request=False (la valeur par défaut), les requêtes du sous-flux de travail sont encapsulées et SubWorkflowRequestMessage envoyées aux exécuteurs connectés dans le flux de travail parent. Cela permet aux exécuteurs parents de gérer la requête localement :
from agent_framework import (
SubWorkflowRequestMessage,
SubWorkflowResponseMessage,
)
class ParentHandler(Executor):
@handler
async def handle_request(
self,
request: SubWorkflowRequestMessage,
ctx: WorkflowContext[SubWorkflowResponseMessage],
) -> None:
# Inspect the original request from the sub-workflow
original_data = request.source_event.data
# Create and send a response back to the sub-workflow
response = request.create_response(my_response_data)
await ctx.send_message(response, target_id=request.executor_id)
La create_response() méthode valide que le type de données de réponse correspond au type attendu de la requête d’origine. Si les types ne correspondent pas, un TypeError élément est déclenché.
Important
Lors de l’envoi de la réponse, utilisez target_id=request.executor_id pour acheminer le SubWorkflowResponseMessage vers l’instance WorkflowExecutor correcte.
Propagation de requêtes vers des appelants externes
Avec propagate_request=True, les demandes du sous-flux de travail sont propagées au flux d’événements du flux d’événements du flux de travail parent à l’aide du mécanisme standard request_info . L’appelant du flux de travail parent gère ces requêtes de la même façon que toute autre requête humaine dans la boucle :
sub_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
propagate_request=True,
)
# Run the parent workflow and handle propagated requests
result = await parent_workflow.run(input_data)
request_info_events = result.get_request_info_events()
if request_info_events:
responses = {}
for event in request_info_events:
# Handle each request (e.g., ask a human)
responses[event.request_id] = get_human_response(event.data)
result = await parent_workflow.run(responses=responses)
Fonctionnement
Lorsque le flux de travail parent achemine un message vers :WorkflowExecutor
- Remise d’entrée : le message est transféré à l’exécuteur de démarrage du flux de travail interne. Le type de message doit correspondre aux types d’entrée attendus de l’exécuteur de démarrage.
- Exécution interne : le workflow interne exécute sa propre boucle superstep jusqu'à ce qu'elle soit terminée ou jusqu'à ce qu'il ait besoin d'une entrée externe.
-
Collection de sorties : les événements de sortie du flux de travail interne sont collectés et transférés en fonction du
allow_direct_outputparamètre. -
Transfert de requête : si le flux de travail interne a des demandes en attente, ils sont transférés en fonction du
propagate_requestparamètre (voir Demandes et réponses). -
Accumulation de réponse : collecte
WorkflowExecutorles réponses et reprend le sous-flux de travail uniquement lorsque toutes les réponses attendues pour une exécution donnée ont été reçues. - Répartition en aval : les sorties sont envoyées au exécuteur suivant dans le flux de travail parent.
Le sous-flux de travail conserve son propre état interne de manière indépendante par rapport au flux de travail parent. Les messages sont routés uniquement via les arêtes qui connectent le WorkflowExecutor au reste du graphe parent — il n’y a pas de diffusion de message à travers les niveaux d’imbrication.
Imbrication à plusieurs niveaux
Les sous-flux de travail peuvent être imbriqués à une profondeur arbitraire. Chaque niveau conserve son propre contexte d’exécution :
# Level 1: Data preparation pipeline
data_pipeline = (
WorkflowBuilder(start_executor=fetcher)
.add_edge(fetcher, cleaner)
.build()
)
# Level 2: Analysis pipeline (contains the data pipeline)
analysis_pipeline = (
WorkflowBuilder(start_executor=data_pipeline) # Implicit wrapping
.add_edge(data_pipeline, analyzer)
.build()
)
# Level 3: Top-level orchestration
top_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, analysis_pipeline) # Implicit wrapping
.add_edge(analysis_pipeline, reporter)
.build()
)
Note
Chaque niveau d’imbrication ajoute une surcharge d’exécution, car le flux de travail interne exécute sa propre boucle de superétapes. Gardez la profondeur d’imbrication raisonnable pour les scénarios sensibles à la performance.
Avertissement
Toutes les exécutions simultanées d’un WorkflowExecutor partage de la même instance de workflow sous-jacente. Les exécuteurs à l’intérieur du sous-workflow doivent être sans état pour éviter les interférences entre les exécutions simultanées.
Gestion des erreurs
Lorsqu’un sous-flux de travail échoue, l’erreur est propagée au flux de travail parent.
WorkflowExecutor capture l'événement échoué du sous-flux de travail et le convertit en événement d'erreur dans le contexte parent.
async for event in parent_workflow.run(input_data, stream=True):
if event.type == "failed":
print(f"Sub-workflow failed: {event.details.message}")
elif event.type == "output":
print(event.data)
Si le sous-flux de travail rencontre une exception non gérée, le flux de travail parent reçoit un événement d’erreur avec les détails de l’exception, y compris l’ID du sous-flux de travail.
Points de contrôle
Les sous-flux de travail prennent en charge le point de contrôle. Lorsqu’un point de contrôle est effectué sur le flux de travail parent, il WorkflowExecutor sérialise son état interne, y compris la progression de l’exécution du flux de travail interne et tous les messages mis en cache. Lors de la restauration, cet état est désérialisé, ce qui permet au flux de travail parent de reprendre avec le sous-flux de travail intact.
from agent_framework import FileCheckpointStorage, WorkflowBuilder
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
# Build the parent workflow with checkpointing
parent_workflow = (
WorkflowBuilder(
start_executor=coordinator,
checkpoint_storage=checkpoint_storage,
)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
# Run with automatic checkpointing
async for event in parent_workflow.run("Analyze the dataset", stream=True):
if event.type == "output":
print(event.data)
# Resume from a checkpoint
checkpoints = await checkpoint_storage.list_checkpoints()
async for event in parent_workflow.run(
checkpoint_id=checkpoints[-1].checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
if event.type == "output":
print(event.data)