Compartilhar via


Subfluxos de Trabalho

Um sub-fluxo de trabalho é um fluxo de trabalho completo que é executado como um executor dentro de um fluxo de trabalho pai. Isso permite que você componha sistemas complexos de blocos de construção de fluxo de trabalho menores e reutilizáveis, cada um com seu próprio contexto de execução isolado, gerenciamento de estado e roteamento de mensagens.

Visão geral

Os sub-fluxos de trabalho são úteis quando você deseja:

  • Decompor a complexidade — divida um grande fluxo de trabalho em unidades menores e independentemente testáveis.
  • Reutilizar lógica de fluxo de trabalho – insira o mesmo sub-fluxo de trabalho em vários fluxos de trabalho pai.
  • Isolar o estado – mantenha o estado interno de cada subfluxo de trabalho separado do fluxo de trabalho pai.
  • Controlar o fluxo de dados — as mensagens entram e saem do subfluxo de trabalho somente por meio de suas bordas, sem transmissão transversal entre níveis.

Quando um sub-fluxo de trabalho é adicionado a um fluxo de trabalho pai, ele se comporta como qualquer outro executor: ele recebe mensagens de entrada, executa seu grafo interno até a conclusão e produz mensagens de saída para executores downstream.

Criando um Sub-Workflow

No C#, você redigi os sub-fluxos de trabalho de duas maneiras:

  • Associação direta – use BindAsExecutor() para inserir um fluxo de trabalho diretamente como executor no fluxo de trabalho pai. Isso preserva os tipos nativos de entrada/saída do sub-fluxo de trabalho.
  • Encapsulamento do agente – use AsAIAgent() para converter um fluxo de trabalho em um agente e, em seguida, adicione o agente ao fluxo de trabalho pai. Isso é útil quando o fluxo de trabalho pai usa executores baseados em agente.

Associação direta com BindAsExecutor

O BindAsExecutor() método de extensão converte um fluxo de trabalho em um ExecutorBinding que pode ser adicionado diretamente a um fluxo de trabalho pai:

using Microsoft.Agents.AI.Workflows;

// Create executors for the inner workflow
UppercaseExecutor uppercase = new();
ReverseExecutor reverse = new();
AppendSuffixExecutor append = new(" [PROCESSED]");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(uppercase)
    .AddEdge(uppercase, reverse)
    .AddEdge(reverse, append)
    .WithOutputFrom(append)
    .Build();

// Bind the inner workflow as an executor
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("TextProcessingSubWorkflow");

// Build the parent workflow using the sub-workflow executor
PrefixExecutor prefix = new("INPUT: ");
PostProcessExecutor postProcess = new();

var parentWorkflow = new WorkflowBuilder(prefix)
    .AddEdge(prefix, subWorkflowExecutor)
    .AddEdge(subWorkflowExecutor, postProcess)
    .WithOutputFrom(postProcess)
    .Build();

Com BindAsExecutoros tipos de entrada e saída tipados do sub-fluxo de trabalho são preservados, o fluxo de trabalho pai roteia mensagens com base nos tipos reais que o sub-fluxo de trabalho espera e produz.

Encapsulamento de agente com AsAIAgent

Quando o fluxo de trabalho pai usa executores baseados em agente, converta o fluxo de trabalho interno em um agente usando AsAIAgent(). O WorkflowBuilder encapsula automaticamente o agente em um executor:

using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;

// Create agents for the inner workflow
AIAgent specialist1 = chatClient.AsAIAgent("You are specialist 1. Analyze the data.");
AIAgent specialist2 = chatClient.AsAIAgent("You are specialist 2. Validate the analysis.");

// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(specialist1)
    .AddEdge(specialist1, specialist2)
    .Build();

// Convert the inner workflow to an agent
AIAgent innerWorkflowAgent = innerWorkflow.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline",
    description: "A sub-workflow that analyzes and validates data"
);

// Create agents for the parent workflow
AIAgent coordinator = chatClient.AsAIAgent("You are a coordinator. Delegate tasks to the team.");
AIAgent reviewer = chatClient.AsAIAgent("You are a reviewer. Review the final output.");

// Build the parent workflow with the sub-workflow
var parentWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, innerWorkflowAgent)
    .AddEdge(innerWorkflowAgent, reviewer)
    .Build();

O fluxo de trabalho interno opera como uma etapa única do ponto de vista do fluxo de trabalho pai. O coordenador envia mensagens para o pipeline de análise, que executa specialist1 → specialist2 internamente e, em seguida, encaminha o resultado para o revisor.

Dica

Use BindAsExecutor() ao trabalhar com executores tipados e AsAIAgent() ao trabalhar com workflows baseados em agentes. Para obter detalhes sobre como configurar a conversão de fluxo de trabalho para agente, consulte Fluxos de Trabalho como Agentes.

Tipos de entrada e saída

Quando um fluxo de trabalho é usado como um sub-fluxo de trabalho, ele preserva os contratos de tipo de seus executores internos.

Com BindAsExecutoro executor de sub-fluxo de trabalho aceita os mesmos tipos de entrada que o executor inicial do fluxo de trabalho interno e envia os mesmos tipos de saída que o fluxo de trabalho interno produz. As bordas do fluxo de trabalho pai devem conectar executores cujos tipos de saída correspondem aos tipos de entrada esperados do subfluxo de trabalho, e os tipos de saída do subfluxo de trabalho devem corresponder às entradas esperadas dos executores posteriores.

Com , o sub-fluxo de trabalho é transformado como um agente e segue os contratos de entrada/saída do Executor de Agente (, , , ).

Comportamento de saída

Por padrão, quando um sub-fluxo de trabalho produz saídas (via YieldOutputAsync), essas saídas são encaminhadas como mensagens para executores conectados no fluxo de trabalho pai. Isso permite que executores downstream processem resultados de sub-fluxo de trabalho.

A ExecutorOptions classe controla esse comportamento:

Opção Default Descrição
AutoSendMessageHandlerResultObject true Encaminhar resultados de subfluxo de trabalho como mensagens para executores conectados do grafo pai.
AutoYieldOutputHandlerResultObject false Produza saídas de sub-fluxo de trabalho diretamente para o fluxo de eventos de saída do fluxo de trabalho pai.

Quando AutoYieldOutputHandlerResultObject estiver habilitado, as saídas de sub-fluxo de trabalho ignoram o roteamento interno do pai e são entregues diretamente ao chamador do fluxo de trabalho pai.

var options = new ExecutorOptions
{
    AutoYieldOutputHandlerResultObject = true,
};

ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("SubWorkflow", options);

Solicitações e respostas

Os sub-fluxos de trabalho dão suporte total ao mecanismo de solicitação e resposta . Quando um executor dentro do subfluxo de trabalho envia uma solicitação (por exemplo, para solicitar entrada humana), o WorkflowHostExecutor encaminha ao RequestInfoEvent workflow principal com uma ID de porta qualificada — o ID do executor do subfluxo de trabalho é anexado ao ID da porta (por exemplo, SubWorkflow.GuessNumber).

Essa qualificação garante que, quando o fluxo de trabalho pai receber uma resposta, ele possa rotear a resposta de volta para a instância de subfluxo de trabalho correta. O fluxo de trabalho pai lida com solicitações de sub-fluxo de trabalho usando o mesmo mecanismo de resposta que qualquer outra solicitação:

await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(parentWorkflow, input);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    switch (evt)
    {
        case RequestInfoEvent requestInfoEvt:
            // The request may originate from the sub-workflow
            // Handle it and send the response back
            var response = requestInfoEvt.Request.CreateResponse(myResponseData);
            await handle.SendResponseAsync(response);
            break;

        case WorkflowOutputEvent outputEvt:
            Console.WriteLine($"Output: {outputEvt.Data}");
            break;
    }
}

Observação

Da perspectiva do chamador do fluxo de trabalho pai, não há diferença entre uma requisição de um executor principal e uma requisição de um sub-fluxo de trabalho. O framework lida com o roteamento de forma transparente.

Como funciona

Quando o fluxo de trabalho pai roteia uma mensagem para o executor de subfluxo:

  1. Entrega de entrada – a mensagem é encaminhada para o executor inicial do fluxo de trabalho interno. Com BindAsExecutor, o tipo de mensagem deve corresponder aos tipos esperados do executor inicial. Com AsAIAgent, as mensagens são normalizadas para ChatMessage formato.
  2. Execução interna – o fluxo de trabalho interno executa seu próprio loop superstep.
  3. Coleção de saída – os eventos de saída do fluxo de trabalho interno são coletados. Com BindAsExecutor, as saídas mantêm seus tipos originais. Com AsAIAgent, as saídas são convertidas em mensagens de resposta do agente.
  4. Encaminhamento de solicitação – se o fluxo de trabalho interno tiver solicitações pendentes, elas serão encaminhadas para o fluxo de trabalho pai para tratamento (consulte Solicitações e Respostas).
  5. Envio downstream – as mensagens resultantes são enviadas no fluxo de trabalho pai para o próximo executor.

Como o fluxo de trabalho interno mantém seu próprio contexto de execução, seu estado é independente do fluxo de trabalho pai.

Dica

Para obter detalhes sobre como configurar a conversão de fluxo de trabalho para agente, incluindo comportamento de streaming e tratamento de exceção, consulte Fluxos de Trabalho como Agentes.

Aninhamento de vários níveis

Os sub-fluxos de trabalho podem ser aninhados em profundidade arbitrária. Cada nível mantém seu próprio contexto de execução:

// Level 1: Data preparation pipeline
var dataPipeline = new WorkflowBuilder(fetcher)
    .AddEdge(fetcher, cleaner)
    .Build();

AIAgent dataPipelineAgent = dataPipeline.AsAIAgent(
    id: "data-pipeline",
    name: "Data Pipeline"
);

// Level 2: Analysis pipeline (contains the data pipeline)
var analysisPipeline = new WorkflowBuilder(dataPipelineAgent)
    .AddEdge(dataPipelineAgent, analyzer)
    .Build();

AIAgent analysisPipelineAgent = analysisPipeline.AsAIAgent(
    id: "analysis-pipeline",
    name: "Analysis Pipeline"
);

// Level 3: Top-level orchestration
var topWorkflow = new WorkflowBuilder(coordinator)
    .AddEdge(coordinator, analysisPipelineAgent)
    .AddEdge(analysisPipelineAgent, reporter)
    .Build();

Observação

Cada nível de aninhamento adiciona sobrecarga de execução porque o fluxo de trabalho interno executa seu próprio loop de superpasso. Mantenha a profundidade de aninhamento razoável para cenários sensíveis ao desempenho.

Tratamento de erros

Quando um sub-fluxo de trabalho falha, o erro é propagado para o fluxo de trabalho pai como um SubworkflowErrorEvent. O fluxo de trabalho pai pode observar esses erros por meio de seu fluxo de eventos:

await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
    if (evt is SubworkflowErrorEvent subError)
    {
        Console.WriteLine($"Sub-workflow '{subError.ExecutorId}' failed: {subError.Data}");
    }
}

Se o subfluxo de trabalho encontrar uma exceção sem tratamento, a execução do fluxo de trabalho pai continuará, mas o executor do subfluxo de trabalho parará de processar novas mensagens.

Definindo o ponto de verificação

Quando um ponto de verificação é feito no fluxo de trabalho pai, o estado da sessão do agente do subfluxo de trabalho é serializado como parte dos dados de ponto de verificação do executor pai. Na restauração, o estado da sessão é desserializado, permitindo que o fluxo de trabalho pai seja retomado com o estado do subfluxo de trabalho intacto.

CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();

// Run the parent workflow with checkpointing
StreamingRun run = await InProcessExecution
    .RunStreamingAsync(parentWorkflow, input, checkpointManager);

await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    // Process events, including those from sub-workflows
}

// Resume from a checkpoint
CheckpointInfo checkpoint = run.Checkpoints[^1];
StreamingRun resumedRun = await InProcessExecution
    .ResumeStreamingAsync(parentWorkflow, checkpoint, checkpointManager);

Criando um Sub-Workflow

No Python, você cria um sub-fluxo de trabalho encapsulando um Workflow em um WorkflowExecutor e adicionando-o a um fluxo de trabalho pai.

from agent_framework import WorkflowBuilder, WorkflowExecutor

# Create agents for the inner workflow
specialist1 = client.as_agent(name="Specialist1", instructions="Analyze the data.")
specialist2 = client.as_agent(name="Specialist2", instructions="Validate the analysis.")

# Build the inner workflow
inner_workflow = (
    WorkflowBuilder(start_executor=specialist1)
    .add_edge(specialist1, specialist2)
    .build()
)

# Wrap as an executor
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

# Create agents for the parent workflow
coordinator = client.as_agent(name="Coordinator", instructions="Delegate tasks to the team.")
reviewer = client.as_agent(name="Reviewer", instructions="Review the final output.")

# Build the parent workflow with the sub-workflow
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

O fluxo de trabalho interno opera como uma etapa única do ponto de vista do fluxo de trabalho pai. O coordenador envia mensagens para o pipeline de análise, que executa specialist1 → specialist2 internamente e, em seguida, encaminha o resultado para o revisor.

Os parâmetros do WorkflowExecutor

Parâmetro Tipo Default Descrição
workflow Workflow A instância de fluxo de trabalho a ser designada como executor.
id str Identificador exclusivo para este executor.
allow_direct_output bool False Quando True as saídas de subfluxo de trabalho são enviadas diretamente para o fluxo de eventos do workflow pai, em vez de serem enviadas como mensagens para executores associados.
propagate_request bool False Quando True, as solicitações do sub-fluxo de trabalho são propagadas para a corrente de eventos do workflow principal como eventos regulares de informações de solicitação. Quando False, as solicitações são encapsuladas em SubWorkflowRequestMessage para serem interceptadas por executores principais.

Encapsulamento Implícito versus Explícito

O WorkflowBuilder pode encapsular automaticamente as instâncias Workflow em um WorkflowExecutor quando você as passa diretamente. Isso é semelhante a como as instâncias Agent são encapsuladas automaticamente AgentExecutor.

# Implicit wrapping — WorkflowBuilder detects the Workflow and wraps it
parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow)    # Workflow auto-wrapped
    .add_edge(inner_workflow, reviewer)
    .build()
)

Use o encapsulamento explícito quando precisar:

  • Atribua uma ID de executor específica para referência em várias bordas.
  • Reutilize a mesma WorkflowExecutor instância no grafo.
# Explicit wrapping — create the WorkflowExecutor yourself
inner_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
)

parent_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

Tipos de entrada e saída

O WorkflowExecutor herda sua assinatura de tipo do fluxo de trabalho encapsulado:

  • Os tipos de entrada correspondem aos tipos de entrada do executor inicial do fluxo de trabalho encapsulado (além de SubWorkflowResponseMessage lidar com respostas a solicitações encaminhadas).
  • Os tipos de saída correspondem aos tipos de saída do fluxo de trabalho encapsulado. Se qualquer executor no sub-fluxo de trabalho for capaz de solicitação-resposta, SubWorkflowRequestMessage também será incluído como um tipo de saída.

Isso significa que as bordas do fluxo de trabalho pai devem conectar executores cujos tipos de saída correspondem aos tipos de entrada esperados do sub-fluxo de trabalho. Da mesma forma, os executores downstream devem aceitar os tipos que o sub-fluxo de trabalho produz:

# The sub-workflow's start executor accepts TextProcessingRequest
# So the parent executor must send TextProcessingRequest
class Orchestrator(Executor):
    @handler
    async def start(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
        for text in texts:
            await ctx.send_message(TextProcessingRequest(text=text))

# The sub-workflow yields TextProcessingResult
# So the downstream executor must handle TextProcessingResult
class ResultCollector(Executor):
    @handler
    async def collect(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
        print(f"Received: {result}")

Comportamento de saída

Por padrão (allow_direct_output=False), quando um sub-fluxo de trabalho produz saídas por meio yield_output, essas saídas são encaminhadas como mensagens para executores conectados no fluxo de trabalho pai usando send_message. Isso permite que executores a jusante processem resultados de subfluxo de trabalho como parte do grafo pai.

Quando allow_direct_output=True as saídas de subfluxo de trabalho são enviadas diretamente ao fluxo de eventos do fluxo de trabalho pai. As saídas do sub-fluxo de trabalho tornam-se saídas do fluxo de trabalho pai, ignorando o roteamento interno do executor pai:

# Outputs go directly to parent's event stream
sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    allow_direct_output=True,
)

# The caller receives sub-workflow outputs directly
async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "output":
        # This output came from the sub-workflow
        print(event.data)

Solicitações e respostas

Os sub-fluxos de trabalho dão suporte total ao mecanismo de solicitação e resposta . Quando um executor dentro de um sub-fluxo de trabalho chama ctx.request_info(), o WorkflowExecutor intercepta a solicitação e a manipula com base na configuração de propagate_request.

Intercepção de solicitações no fluxo de trabalho pai (padrão)

Com propagate_request=False (o padrão), as solicitações do sub-fluxo de trabalho são encapsuladas em um SubWorkflowRequestMessage e enviadas para executores conectados no fluxo de trabalho pai. Isso permite que os executores pai lidem com a solicitação localmente:

from agent_framework import (
    SubWorkflowRequestMessage,
    SubWorkflowResponseMessage,
)


class ParentHandler(Executor):
    @handler
    async def handle_request(
        self,
        request: SubWorkflowRequestMessage,
        ctx: WorkflowContext[SubWorkflowResponseMessage],
    ) -> None:
        # Inspect the original request from the sub-workflow
        original_data = request.source_event.data

        # Create and send a response back to the sub-workflow
        response = request.create_response(my_response_data)
        await ctx.send_message(response, target_id=request.executor_id)

O create_response() método valida que o tipo de dados de resposta corresponde ao tipo esperado da solicitação original. Se os tipos não corresponderem, um TypeError será gerado.

Importante

Ao enviar a resposta de volta, use target_id=request.executor_id para rotear o SubWorkflowResponseMessage para a instância correta de WorkflowExecutor.

Propagação de solicitações para chamadores externos

Com propagate_request=True, as solicitações do sub-fluxo de trabalho são propagadas para o fluxo de eventos do fluxo de trabalho pai usando o mecanismo padrão request_info . O chamador do fluxo de trabalho pai lida com essas solicitações da mesma maneira que qualquer outra solicitação humana no loop:

sub_workflow_executor = WorkflowExecutor(
    workflow=inner_workflow,
    id="analysis-pipeline",
    propagate_request=True,
)

# Run the parent workflow and handle propagated requests
result = await parent_workflow.run(input_data)
request_info_events = result.get_request_info_events()
if request_info_events:
    responses = {}
    for event in request_info_events:
        # Handle each request (e.g., ask a human)
        responses[event.request_id] = get_human_response(event.data)
    result = await parent_workflow.run(responses=responses)

Como funciona

Quando o fluxo de trabalho pai roteia uma mensagem para:WorkflowExecutor

  1. Entrega de entrada – a mensagem é encaminhada para o executor inicial do fluxo de trabalho interno. O tipo de mensagem deve corresponder aos tipos de entrada esperados do executor inicial.
  2. Execução interna – o fluxo de trabalho interno executa seu próprio loop de superstep até a conclusão, ou até que precise de entrada externa.
  3. Coleção de saída – os eventos de saída do fluxo de trabalho interno são coletados e encaminhados com base na allow_direct_output configuração.
  4. Encaminhamento de solicitação – se o fluxo de trabalho interno tiver solicitações pendentes, elas serão encaminhadas com base na propagate_request configuração (consulte Solicitações e Respostas).
  5. Acúmulo de resposta – o WorkflowExecutor coleta respostas e retoma o sub-fluxo de trabalho somente quando todas as respostas esperadas para uma determinada execução foram recebidas.
  6. Expedição downstream – as saídas são enviadas para o próximo executor no fluxo de trabalho pai.

O subfluxo de trabalho mantém, independentemente do pai, seu próprio estado interno. As mensagens são roteadas apenas pelas bordas que conectam o WorkflowExecutor ao restante do grafo pai — não há transmissão de mensagens entre os níveis de aninhamento.

Aninhamento de vários níveis

Os sub-fluxos de trabalho podem ser aninhados em profundidade arbitrária. Cada nível mantém seu próprio contexto de execução:

# Level 1: Data preparation pipeline
data_pipeline = (
    WorkflowBuilder(start_executor=fetcher)
    .add_edge(fetcher, cleaner)
    .build()
)

# Level 2: Analysis pipeline (contains the data pipeline)
analysis_pipeline = (
    WorkflowBuilder(start_executor=data_pipeline)  # Implicit wrapping
    .add_edge(data_pipeline, analyzer)
    .build()
)

# Level 3: Top-level orchestration
top_workflow = (
    WorkflowBuilder(start_executor=coordinator)
    .add_edge(coordinator, analysis_pipeline)       # Implicit wrapping
    .add_edge(analysis_pipeline, reporter)
    .build()
)

Observação

Cada nível de aninhamento adiciona sobrecarga de execução porque o fluxo de trabalho interno executa seu próprio loop de superpasso. Mantenha a profundidade de aninhamento razoável para cenários sensíveis ao desempenho.

Aviso

Todas as execuções simultâneas de um WorkflowExecutor compartilham a mesma instância de fluxo de trabalho subjacente. Executores dentro do sub-fluxo de trabalho devem ser sem estado para evitar interferência entre execuções simultâneas.

Tratamento de erros

Quando um sub-fluxo de trabalho falha, o erro é propagado para o fluxo de trabalho pai. O WorkflowExecutor captura o evento com falha do sub-fluxo de trabalho e o converte em um evento de erro no contexto pai:

async for event in parent_workflow.run(input_data, stream=True):
    if event.type == "failed":
        print(f"Sub-workflow failed: {event.details.message}")
    elif event.type == "output":
        print(event.data)

Se o sub-fluxo de trabalho encontrar uma exceção não tratada, o fluxo de trabalho pai receberá um evento de erro contendo os detalhes da exceção, incluindo a ID do sub-fluxo de trabalho.

Definindo o ponto de verificação

Os sub-fluxos de trabalho dão suporte ao ponto de verificação. Quando um ponto de verificação é feito no fluxo de trabalho pai, o WorkflowExecutor serializa seu estado interno, incluindo o progresso de execução do fluxo de trabalho interno e todas as mensagens armazenadas em cache. Na restauração, esse estado é desserializado, permitindo que o fluxo de trabalho pai seja retomado com o subfluxo de trabalho intacto.

from agent_framework import FileCheckpointStorage, WorkflowBuilder

checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")

# Build the parent workflow with checkpointing
parent_workflow = (
    WorkflowBuilder(
        start_executor=coordinator,
        checkpoint_storage=checkpoint_storage,
    )
    .add_edge(coordinator, inner_workflow_executor)
    .add_edge(inner_workflow_executor, reviewer)
    .build()
)

# Run with automatic checkpointing
async for event in parent_workflow.run("Analyze the dataset", stream=True):
    if event.type == "output":
        print(event.data)

# Resume from a checkpoint
checkpoints = await checkpoint_storage.list_checkpoints()
async for event in parent_workflow.run(
    checkpoint_id=checkpoints[-1].checkpoint_id,
    checkpoint_storage=checkpoint_storage,
    stream=True,
):
    if event.type == "output":
        print(event.data)

Próximas Etapas