Partager via


Sous-Flux de Travail

Un sous-flux de travail est un flux de travail complet qui s’exécute en tant qu’exécuteur au sein d’un flux de travail parent. Cela vous permet de composer des systèmes complexes à partir de blocs de construction de flux de travail plus petits et réutilisables, chacun avec son propre contexte d’exécution isolé, la gestion de l’état et le routage des messages.

Aperçu

Les sous-flux de travail sont utiles lorsque vous souhaitez :

  • Décomposition de la complexité : décomposez un flux de travail volumineux en unités testables plus petites et indépendantes.
  • Réutiliser la logique de flux de travail : incorporez le même sous-flux de travail dans plusieurs flux de travail parents.
  • Isoler l’état : conservez l’état interne de chaque sous-flux de travail distinct du parent.
  • Contrôler le flux de données : les messages entrent et quittent le sous-flux de travail uniquement par ses bords, sans diffusion à différents niveaux.

Lorsqu’un sous-flux de travail est ajouté à un flux de travail parent, il se comporte comme n’importe quel autre exécuteur : il reçoit des messages d’entrée, exécute son graphe interne à l’achèvement et produit des messages de sortie pour les exécuteurs en aval.

Création d’un Sub-Workflow

En C#, vous composez des sous-workflows de deux façons :

  • Liaison directe : permet BindAsExecutor() d’incorporer un flux de travail directement en tant qu’exécuteur dans le flux de travail parent. Cela conserve les types d’entrée/sortie natifs du sous-flux de travail.
  • Habillage de l’agent : permet AsAIAgent() de convertir un flux de travail en agent, puis d’ajouter l’agent au flux de travail parent. Cela est utile lorsque le flux de travail parent utilise des exécuteurs basés sur l’agent.

Liaison directe avec BindAsExecutor

La BindAsExecutor() méthode d’extension convertit un flux de travail en un ExecutorBinding qui peut être ajouté directement à un flux de travail parent :

using Microsoft.Agents.AI.Workflows;

// Create executors for the inner workflow
UppercaseExecutor uppercase = new();
ReverseExecutor reverse = new();
AppendSuffixExecutor append = new(" [PROCESSED]");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(uppercase)
    .AddEdge(uppercase, reverse)
    .AddEdge(reverse, append)
    .WithOutputFrom(append)
    .Build();

// Bind the inner workflow as an executor
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("TextProcessingSubWorkflow");

// Build the parent workflow using the sub-workflow executor
PrefixExecutor prefix = new("INPUT: ");
PostProcessExecutor postProcess = new();

var parentWorkflow = new WorkflowBuilder(prefix)
    .AddEdge(prefix, subWorkflowExecutor)
    .AddEdge(subWorkflowExecutor, postProcess)
    .WithOutputFrom(postProcess)
    .Build();

Avec BindAsExecutor, les types d’entrée et de sortie typés du sous-flux de travail sont conservés : le flux de travail parent achemine les messages en fonction des types réels attendus et produits par le sous-flux de travail.

Enveloppement de l’agent avec AsAIAgent

Lorsque le flux de travail parent utilise des exécuteurs basés sur un agent, convertissez le flux de travail interne en agent à l’aide de AsAIAgent(). L’agent WorkflowBuilder est automatiquement encapsulé dans un exécuteur :

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;

// Create agents for the inner workflow
AIAgent specialist1 = chatClient.AsAIAgent("You are specialist 1. Analyze the data.");
AIAgent specialist2 = chatClient.AsAIAgent("You are specialist 2. Validate the analysis.");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(specialist1)
    .AddEdge(specialist1, specialist2)
    .Build();

// Convert the inner workflow to an agent
AIAgent innerWorkflowAgent = innerWorkflow.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline",
    description: "A sub-workflow that analyzes and validates data"
);

// Create agents for the parent workflow
AIAgent coordinator = chatClient.AsAIAgent("You are a coordinator. Delegate tasks to the team.");
AIAgent reviewer = chatClient.AsAIAgent("You are a reviewer. Review the final output.");

// Build the parent workflow with the sub-workflow
var parentWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, innerWorkflowAgent)
    .AddEdge(innerWorkflowAgent, reviewer)
    .Build();

Le flux de travail interne s’exécute en une seule étape du point de vue du flux de travail parent. Le coordinateur envoie des messages au pipeline d’analyse, qui s’exécute specialist1 → specialist2en interne, puis transfère le résultat au réviseur.

Conseil / Astuce

Utilisez-la BindAsExecutor() lors de l’utilisation d’exécuteurs typés et AsAIAgent() lors de l’utilisation de flux de travail basés sur un agent. Pour plus d’informations sur la configuration de la conversion de flux de travail en agent, consultez Flux de travail en tant qu’agents.

Types d’entrée et de sortie

Lorsqu’un flux de travail est utilisé comme sous-flux de travail, il conserve les contrats de type de ses exécuteurs internes.

Avec BindAsExecutor, l’exécuteur de sous-flux de travail accepte les mêmes types d’entrée que l’exécuteur de démarrage du flux de travail interne et envoie les mêmes types de sortie que le flux de travail interne produit. Les arêtes du flux de travail parent doivent connecter des exécuteurs dont les types de sortie correspondent aux types d’entrée attendus du sous-flux de travail, et les types de sortie du sous-flux de travail doivent correspondre aux entrées attendues des exécuteurs en aval.

Avec AsAIAgent, le sous-flux de travail est encapsulé en tant qu'agent et suit les contrats d’entrée/sortie de l'Agent Executor (string, ChatMessage, IEnumerable<ChatMessage>).

Comportement de sortie

Par défaut, lorsqu’un sous-flux de travail produit des sorties (via YieldOutputAsync), ces sorties sont transférées en tant que messages aux exécuteurs connectés dans le flux de travail parent. Cela permet aux exécuteurs en aval de traiter les résultats des sous-flux de travail.

La ExecutorOptions classe contrôle ce comportement :

Choix Par défaut Description
AutoSendMessageHandlerResultObject true Transférer les sorties de sous-flux de travail en tant que messages aux exécuteurs connectés dans le graphique parent.
AutoYieldOutputHandlerResultObject false Déverser les sorties des sous-flux de travail directement dans le flux d'événements de sortie du flux de travail parent.

LorsquAutoYieldOutputHandlerResultObject est activé, les sorties des sous-flux de travail contournent le routage interne du parent et sont remises directement à l'appelant du flux de travail parent.

var options = new ExecutorOptions
{
    AutoYieldOutputHandlerResultObject = true,
};

ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("SubWorkflow", options);

Demandes et réponses

Les sous-flux de travail prennent entièrement en charge le mécanisme de demande et de réponse . Lorsqu'un exécuteur à l'intérieur du sous-flux de travail envoie une demande (par exemple, pour demander une entrée humaine), le WorkflowHostExecutor la transmet au RequestInfoEvent flux de travaux parent avec un ID de port qualifié : l'ID de l'exécuteur de sous-flux de travail est ajouté à l'ID de port (par exemple, SubWorkflow.GuessNumber).

Cette qualification garantit que lorsque le flux de travail parent reçoit une réponse, il peut router la réponse vers l’instance de sous-flux de travail correcte. Le flux de travail parent gère les demandes de sous-flux de travail à l’aide du même mécanisme de réponse que toute autre requête :

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(parentWorkflow, input);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInfoEvt:
            // The request may originate from the sub-workflow
            // Handle it and send the response back
            var response = requestInfoEvt.Request.CreateResponse(myResponseData);
            await handle.SendResponseAsync(response);
            break;

        case WorkflowOutputEvent outputEvt:
            Console.WriteLine($"Output: {outputEvt.Data}");
            break;
    }
}

Note

Du point de vue de l’appelant de flux de travail parent, il n’existe aucune différence entre une requête d’un exécuteur de niveau supérieur et une requête d’un sous-flux de travail. Le cadre gère le routage de manière transparente.

Fonctionnement

Lorsque le flux de travail parent achemine un message vers l’exécuteur de sous-flux de travail :

  1. Remise d’entrée : le message est transféré à l’exécuteur de démarrage du flux de travail interne. Avec BindAsExecutor, le type de message doit correspondre aux types attendus de l’exécuteur de démarrage. Avec AsAIAgent, les messages sont normalisés au ChatMessage format.
  2. Exécution interne : le flux de travail interne exécute sa propre boucle superstep.
  3. Collection de sorties : les événements de sortie du flux de travail interne sont collectés. Avec BindAsExecutor, les sorties conservent leurs types d’origine. Avec AsAIAgent, les sorties sont converties en messages de réponse de l’agent.
  4. Transfert de requête : si le flux de travail interne a des demandes en attente, ils sont transférés au flux de travail parent pour la gestion (voir Demandes et réponses).
  5. Répartition en aval : les messages résultants sont envoyés au exécuteur suivant dans le flux de travail parent.

Étant donné que le flux de travail interne conserve son propre contexte d’exécution, son état est indépendant du flux de travail parent.

Conseil / Astuce

Pour plus d’informations sur la configuration de la conversion de flux de travail en agent, y compris le comportement de diffusion en continu et la gestion des exceptions, consultez Flux de travail en tant qu’agents.

Imbrication à plusieurs niveaux

Les sous-flux de travail peuvent être imbriqués à une profondeur arbitraire. Chaque niveau conserve son propre contexte d’exécution :

// Level 1: Data preparation pipeline
var dataPipeline = new WorkflowBuilder(fetcher)
    .AddEdge(fetcher, cleaner)
    .Build();

AIAgent dataPipelineAgent = dataPipeline.AsAIAgent(
    id: "data-pipeline",
    name: "Data Pipeline"
);

// Level 2: Analysis pipeline (contains the data pipeline)
var analysisPipeline = new WorkflowBuilder(dataPipelineAgent)
    .AddEdge(dataPipelineAgent, analyzer)
    .Build();

AIAgent analysisPipelineAgent = analysisPipeline.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline"
);

// Level 3: Top-level orchestration
var topWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, analysisPipelineAgent)
    .AddEdge(analysisPipelineAgent, reporter)
    .Build();

Note

Chaque niveau d’imbrication ajoute une surcharge d’exécution, car le flux de travail interne exécute sa propre boucle de superétapes. Gardez la profondeur d’imbrication raisonnable pour les scénarios sensibles à la performance.

Gestion des erreurs

En cas d’échec d’un sous-flux de travail, l’erreur est propagée au flux de travail parent en tant que SubworkflowErrorEvent. Le flux de travail parent peut observer ces erreurs par le biais de son flux d’événements :

await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    if (evt is SubworkflowErrorEvent subError)
    {
        Console.WriteLine($"Sub-workflow '{subError.ExecutorId}' failed: {subError.Data}");
    }
}

Si le sous-flux de travail rencontre une exception non gérée, l’exécution du flux de travail parent se poursuit, mais l’exécuteur de sous-flux de travail cesse de traiter d’autres messages.

Points de contrôle

Lorsqu’un point de contrôle est effectué sur le flux de travail parent, l’état de session de l’agent de sous-flux de travail est sérialisé dans le cadre des données de point de contrôle de l’exécuteur parent. Lors de la restauration, l’état de session est désérialisé, ce qui permet au flux de travail parent de reprendre avec l’état du sous-flux de travail intact.

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the parent workflow with checkpointing
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(parentWorkflow, input, checkpointManager);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // Process events, including those from sub-workflows
}

// Resume from a checkpoint
CheckpointInfo checkpoint = run.Checkpoints[^1];
StreamingRun resumedRun = await InProcessExecution
    .ResumeStreamingAsync(parentWorkflow, checkpoint, checkpointManager);

Création d’un Sub-Workflow

En Python, vous créez un sous-flux de travail en encapsulant un Workflow dans un WorkflowExecutor flux de travail parent et en l’ajoutant à un flux de travail parent.

from agent_framework import WorkflowBuilder, WorkflowExecutor

# Create agents for the inner workflow
specialist1 = client.as_agent(name="Specialist1", instructions="Analyze the data.")
specialist2 = client.as_agent(name="Specialist2", instructions="Validate the analysis.")

# Build the inner workflow
inner_workflow = (
    WorkflowBuilder(start_executor=specialist1)
    .add_edge(specialist1, specialist2)
    .build()
)

# Wrap as an executor
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

# Create agents for the parent workflow
coordinator = client.as_agent(name="Coordinator", instructions="Delegate tasks to the team.")
reviewer = client.as_agent(name="Reviewer", instructions="Review the final output.")

# Build the parent workflow with the sub-workflow
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

Le flux de travail interne s’exécute en une seule étape du point de vue du flux de travail parent. Le coordinateur envoie des messages au pipeline d’analyse, qui s’exécute specialist1 → specialist2en interne, puis transfère le résultat au réviseur.

Paramètres workflowExecutor

Paramètre Type Par défaut Description
workflow Workflow Instance de workflow à encapsuler en tant qu’exécuteur.
id str Identificateur unique pour cet exécuteur.
allow_direct_output bool False Lorsque True, les résultats du sous-flux de travail sont transmis directement au flux d’événements du flux de travail parent au lieu d’être envoyés en tant que messages aux exécuteurs connectés.
propagate_request bool False Lorsque True, les requêtes du sous-workflow sont propagées au flux d'événements du workflow parent en tant qu'événements de requête standard. Lorsque False, les requêtes sont encapsulées SubWorkflowRequestMessage pour l’interception par les exécuteurs parents.

Enveloppement implicite vs explicite

Les instances Workflow peuvent être automatiquement enveloppées par WorkflowBuilder dans un WorkflowExecutor lorsque vous les passez directement. Cela est similaire à la façon dont Agent les instances sont automatiquement encapsulées AgentExecutor.

# Implicit wrapping — WorkflowBuilder detects the Workflow and wraps it
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow)    # Workflow auto-wrapped
    .add_edge(inner_workflow, reviewer)
    .build()
)

Utilisez l'enveloppement explicite lorsque vous en avez besoin.

  • Attribuez un ID d’exécuteur spécifique pour référence dans plusieurs arêtes.
  • Réutilisez la même WorkflowExecutor instance dans le graphique.
# Explicit wrapping — create the WorkflowExecutor yourself
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

Types d’entrée et de sortie

Le WorkflowExecutor hérite de la signature de type du flux de travail encapsulé :

  • Les types d’entrée correspondent aux types d’entrée de l’exécuteur de démarrage du flux de travail encapsulé (plus SubWorkflowResponseMessage pour la gestion des réponses aux demandes transférées).
  • Les types de sortie correspondent aux types de sortie du flux de travail encapsulé. Si un exécuteur dans le sous-flux de travail est capable de répondre aux demandes, SubWorkflowRequestMessage il est également inclus en tant que type de sortie.

Cela signifie que les arêtes du flux de travail parent doivent connecter des exécuteurs dont les types de sortie correspondent aux types d’entrée attendus du sous-workflow. De même, les exécuteurs en aval doivent accepter les types générés par le sous-flux de travail :

# The sub-workflow's start executor accepts TextProcessingRequest
# So the parent executor must send TextProcessingRequest
class Orchestrator(Executor):
    @handler
    async def start(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
        for text in texts:
            await ctx.send_message(TextProcessingRequest(text=text))

# The sub-workflow yields TextProcessingResult
# So the downstream executor must handle TextProcessingResult
class ResultCollector(Executor):
    @handler
    async def collect(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
        print(f"Received: {result}")

Comportement de sortie

Par défaut (allow_direct_output=False), lorsqu’un sous-flux de travail produit des sorties via yield_output, ces sorties sont transférées en tant que messages aux exécuteurs connectés dans le flux de travail parent à l’aide send_messagede . Cela permet aux exécuteurs en aval de traiter les résultats de sous-flux de travail dans le cadre du graphique parent.

Lorsque allow_direct_output=True, les sorties des sous-flux de travail sont directement générées dans le flux d’événements du flux de travail parent. Les sorties du sous-flux de travail deviennent des sorties du flux de travail parent, en contournant le routage de l’exécuteur interne du parent :

# Outputs go directly to parent's event stream
sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    allow_direct_output=True,
)

# The caller receives sub-workflow outputs directly
async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "output":
        # This output came from the sub-workflow
        print(event.data)

Demandes et réponses

Les sous-flux de travail prennent entièrement en charge le mécanisme de demande et de réponse . Lorsqu’un exécuteur situé à l’intérieur d’un sous-flux de travail appelle ctx.request_info(), le WorkflowExecutor intercepte la demande et la gère en fonction du paramètre propagate_request.

Interception des requêtes dans le flux de travail parent (par défaut)

Avec propagate_request=False (la valeur par défaut), les requêtes du sous-flux de travail sont encapsulées et SubWorkflowRequestMessage envoyées aux exécuteurs connectés dans le flux de travail parent. Cela permet aux exécuteurs parents de gérer la requête localement :

from agent_framework import (
    SubWorkflowRequestMessage,
    SubWorkflowResponseMessage,
)


class ParentHandler(Executor):
    @handler
    async def handle_request(
        self,
        request: SubWorkflowRequestMessage,
        ctx: WorkflowContext[SubWorkflowResponseMessage],
    ) -> None:
        # Inspect the original request from the sub-workflow
        original_data = request.source_event.data

        # Create and send a response back to the sub-workflow
        response = request.create_response(my_response_data)
        await ctx.send_message(response, target_id=request.executor_id)

La create_response() méthode valide que le type de données de réponse correspond au type attendu de la requête d’origine. Si les types ne correspondent pas, un TypeError élément est déclenché.

Important

Lors de l’envoi de la réponse, utilisez target_id=request.executor_id pour acheminer le SubWorkflowResponseMessage vers l’instance WorkflowExecutor correcte.

Propagation de requêtes vers des appelants externes

Avec propagate_request=True, les demandes du sous-flux de travail sont propagées au flux d’événements du flux d’événements du flux de travail parent à l’aide du mécanisme standard request_info . L’appelant du flux de travail parent gère ces requêtes de la même façon que toute autre requête humaine dans la boucle :

sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    propagate_request=True,
)

# Run the parent workflow and handle propagated requests
result = await parent_workflow.run(input_data)
request_info_events = result.get_request_info_events()
if request_info_events:
    responses = {}
    for event in request_info_events:
        # Handle each request (e.g., ask a human)
        responses[event.request_id] = get_human_response(event.data)
    result = await parent_workflow.run(responses=responses)

Fonctionnement

Lorsque le flux de travail parent achemine un message vers :WorkflowExecutor

  1. Remise d’entrée : le message est transféré à l’exécuteur de démarrage du flux de travail interne. Le type de message doit correspondre aux types d’entrée attendus de l’exécuteur de démarrage.
  2. Exécution interne : le workflow interne exécute sa propre boucle superstep jusqu'à ce qu'elle soit terminée ou jusqu'à ce qu'il ait besoin d'une entrée externe.
  3. Collection de sorties : les événements de sortie du flux de travail interne sont collectés et transférés en fonction du allow_direct_output paramètre.
  4. Transfert de requête : si le flux de travail interne a des demandes en attente, ils sont transférés en fonction du propagate_request paramètre (voir Demandes et réponses).
  5. Accumulation de réponse : collecte WorkflowExecutor les réponses et reprend le sous-flux de travail uniquement lorsque toutes les réponses attendues pour une exécution donnée ont été reçues.
  6. Répartition en aval : les sorties sont envoyées au exécuteur suivant dans le flux de travail parent.

Le sous-flux de travail conserve son propre état interne de manière indépendante par rapport au flux de travail parent. Les messages sont routés uniquement via les arêtes qui connectent le WorkflowExecutor au reste du graphe parent — il n’y a pas de diffusion de message à travers les niveaux d’imbrication.

Imbrication à plusieurs niveaux

Les sous-flux de travail peuvent être imbriqués à une profondeur arbitraire. Chaque niveau conserve son propre contexte d’exécution :

# Level 1: Data preparation pipeline
data_pipeline = (
    WorkflowBuilder(start_executor=fetcher)
    .add_edge(fetcher, cleaner)
    .build()
)

# Level 2: Analysis pipeline (contains the data pipeline)
analysis_pipeline = (
    WorkflowBuilder(start_executor=data_pipeline)  # Implicit wrapping
    .add_edge(data_pipeline, analyzer)
    .build()
)

# Level 3: Top-level orchestration
top_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, analysis_pipeline)       # Implicit wrapping
    .add_edge(analysis_pipeline, reporter)
    .build()
)

Note

Chaque niveau d’imbrication ajoute une surcharge d’exécution, car le flux de travail interne exécute sa propre boucle de superétapes. Gardez la profondeur d’imbrication raisonnable pour les scénarios sensibles à la performance.

Avertissement

Toutes les exécutions simultanées d’un WorkflowExecutor partage de la même instance de workflow sous-jacente. Les exécuteurs à l’intérieur du sous-workflow doivent être sans état pour éviter les interférences entre les exécutions simultanées.

Gestion des erreurs

Lorsqu’un sous-flux de travail échoue, l’erreur est propagée au flux de travail parent. WorkflowExecutor capture l'événement échoué du sous-flux de travail et le convertit en événement d'erreur dans le contexte parent.

async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "failed":
        print(f"Sub-workflow failed: {event.details.message}")
    elif event.type == "output":
        print(event.data)

Si le sous-flux de travail rencontre une exception non gérée, le flux de travail parent reçoit un événement d’erreur avec les détails de l’exception, y compris l’ID du sous-flux de travail.

Points de contrôle

Les sous-flux de travail prennent en charge le point de contrôle. Lorsqu’un point de contrôle est effectué sur le flux de travail parent, il WorkflowExecutor sérialise son état interne, y compris la progression de l’exécution du flux de travail interne et tous les messages mis en cache. Lors de la restauration, cet état est désérialisé, ce qui permet au flux de travail parent de reprendre avec le sous-flux de travail intact.

from agent_framework import FileCheckpointStorage, WorkflowBuilder

checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

# Build the parent workflow with checkpointing
parent_workflow = (
    WorkflowBuilder(
        start_executor=coordinator,
        checkpoint_storage=checkpoint_storage,
    )
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

# Run with automatic checkpointing
async for event in parent_workflow.run("Analyze the dataset", stream=True):
    if event.type == "output":
        print(event.data)

# Resume from a checkpoint
checkpoints = await checkpoint_storage.list_checkpoints()
async for event in parent_workflow.run(
    checkpoint_id=checkpoints[-1].checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    if event.type == "output":
        print(event.data)

Étapes suivantes