Compartilhar via


Criar um fluxo de trabalho sequencial simples

Este tutorial demonstra como criar um fluxo de trabalho sequencial simples usando fluxos de trabalho do Agent Framework.

Fluxos de trabalho sequenciais são a base da criação de sistemas complexos de agente de IA. Este tutorial mostra como criar um fluxo de trabalho simples de duas etapas em que cada etapa processa dados e os passa para a próxima etapa.

Visão geral

Neste tutorial, você criará um fluxo de trabalho com dois executores:

  1. Executor de Maiúsculas – Converte o texto de entrada para maiúsculas
  2. Executor de Texto Inverso – Inverte o texto e gera o resultado final

O fluxo de trabalho demonstra os principais conceitos como:

  • Criando um executor personalizado com um manipulador
  • Criando um executor personalizado a partir de uma função
  • Usar WorkflowBuilder para conectar executores com bordas
  • Processamento de dados por meio de etapas sequenciais
  • Observando a execução do fluxo de trabalho por meio de eventos

Conceitos abordados

Pré-requisitos

Implementação passo a passo

As seções a seguir mostram como criar o fluxo de trabalho sequencial passo a passo.

Etapa 1: Instalar pacotes NuGet

Primeiro, instale os pacotes necessários para seu projeto do .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Etapa 2: Definir o Executor em Letras Maiúsculas

Defina um executor que converte o texto em letras maiúsculas:

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Pontos-chave:

  • Criar uma função que usa uma cadeia de caracteres e retorna a versão maiúscula
  • Usar BindExecutor() para criar um executor com base na função

Etapa 3: Definir o Executor de Texto Inverso

Defina um executor que inverte o texto:

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Pontos-chave:

  • Criar uma classe que herda de Executor<TInput, TOutput>
  • Implementar HandleAsync() para processar a entrada e retornar a saída

Etapa 4: Criar e conectar o fluxo de trabalho

Conecte os executores usando WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Pontos-chave:

  • WorkflowBuilder construtor usa o executor inicial
  • AddEdge() cria uma conexão direcionada de maiúsculas para reverso
  • WithOutputFrom() especifica quais executores produzem saídas de fluxo de trabalho
  • Build() cria o fluxo de trabalho imutável

Etapa 5: Executar o fluxo de trabalho

Execute o fluxo de trabalho e observe os resultados:

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Etapa 6: Noções básicas sobre a saída do fluxo de trabalho

Ao executar o fluxo de trabalho, você verá uma saída como:

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

A entrada "Olá, Mundo!" é convertida primeiro em maiúscula ("OLÁ, MUNDO!"), depois revertida ("! DLROW ,OLLEH").

Principais conceitos explicados

Interface de Executor

Executores de funções:

  • Usar BindExecutor() para criar um executor a partir de uma função

Implementação de Executor<TInput, TOutput>executores:

  • TInput: o tipo de dados que este executor aceita
  • TOutput: o tipo de dados que este executor produz
  • HandleAsync: o método que processa a entrada e retorna a saída

Padrão do Construtor de Fluxo de Trabalho do .NET

O WorkflowBuilder fornece uma API fluente para a construção de fluxos de trabalho:

  • Construtor: usa o executor inicial
  • AddEdge(): cria conexões direcionadas entre executores
  • WithOutputFrom(): especifica quais executores produzem saídas de fluxo de trabalho
  • Build(): cria o fluxo de trabalho imutável final

Tipos de evento .NET

Durante a execução, você pode observar esses tipos de evento:

  • ExecutorCompletedEvent - Quando um executor conclui o processamento

Exemplo completo do .NET

Para obter a implementação completa e pronta para execução, consulte o exemplo de 01_ExecutorsAndEdges no repositório do Agent Framework.

Este exemplo inclui:

  • Implementação completa com todas as instruções de uso e estrutura de classe
  • Comentários adicionais explicando os conceitos de fluxo de trabalho
  • Concluir a preparação e configuração do projeto

Visão geral

Neste tutorial, você criará um fluxo de trabalho com dois executores:

  1. Executor de Maiúsculas - Converte texto de entrada em maiúsculas
  2. Executor de Texto Inverso – Inverte o texto e gera o resultado final

O fluxo de trabalho demonstra os principais conceitos como:

  • Duas maneiras de definir uma unidade de trabalho (um nó de execução):
    1. Uma classe personalizada que subclasse Executor com um método assíncrono marcado por @handler
    2. Uma função assíncrona independente decorada com @executor
  • Conectando executores com WorkflowBuilder
  • Passando dados entre etapas com ctx.send_message()
  • Gerando saída final com ctx.yield_output()
  • Eventos de streaming para observabilidade em tempo real

Conceitos abordados

Pré-requisitos

  • Python 3.10 ou posterior
  • Pacote python do Agent Framework Core instalado: pip install agent-framework-core --pre
  • Nenhum serviço de IA externo necessário para este exemplo básico

Implementação passo a passo

As seções a seguir mostram como criar o fluxo de trabalho sequencial passo a passo.

Etapa 1: Importar módulos necessários

Primeiro, importe os módulos necessários do Agent Framework:

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Etapa 2: Criar o Primeiro Executor

Crie um executor que converte o texto em letras maiúsculas implementando um executor com um método de manipulador:

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Pontos-chave:

  • A subclasse Executor permite definir um nó nomeado com ganchos de ciclo de vida, se necessário
  • O decorador @handler marca o método assíncrono que realiza a tarefa.
  • A assinatura do manipulador segue um contrato:
    • O primeiro parâmetro é a entrada que foi digitada para este nó (aqui: text: str)
    • O segundo parâmetro é um WorkflowContext[T_Out], onde T_Out é o tipo de dados que esse nó emitirá por meio de ctx.send_message() (aqui: str)
  • Em um manipulador, você normalmente calcula um resultado e encaminha-o para nós downstream usando ctx.send_message(result)

Etapa 3: Criar o Segundo Executor

Para etapas simples, você pode ignorar a subclasse e definir uma função assíncrona com o mesmo padrão de assinatura (entrada tipada + WorkflowContext) e decorá-la com @executor. Isso cria um nó totalmente funcional que pode ser conectado a um fluxo:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Pontos-chave:

  • O @executor decorador transforma uma função assíncrona independente em um nó de fluxo de trabalho
  • O WorkflowContext parâmetro é parametrizado com dois tipos:
    • T_Out = Never: este nó não envia mensagens para os nós a jusante
    • T_W_Out = str: este nó produz uma saída de fluxo de trabalho do tipo str
  • Nós de terminal produzem saídas usando ctx.yield_output() para fornecer resultados de fluxo de trabalho
  • O fluxo de trabalho é concluído quando fica ocioso (não há mais trabalho a ser feito)

Etapa 4: Criar o fluxo de trabalho

Conecte os executores usando WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Pontos-chave:

  • add_edge() cria conexões direcionadas entre executores
  • set_start_executor() define o ponto de entrada
  • build() finaliza o fluxo de trabalho

Etapa 5: Executar o fluxo de trabalho com streaming

Execute o fluxo de trabalho e observe eventos em tempo real:

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Etapa 6: Noções básicas sobre a saída

Ao executar o fluxo de trabalho, você verá eventos como:

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Principais conceitos explicados

Duas maneiras de definir executores

  1. Classe personalizada (subclasse Executor): melhor quando você precisa de ganchos de ciclo de vida ou estado complexo. Defina um método assíncrono com o decorador @handler.
  2. Baseado em função (@executor decorador): é melhor para etapas simples. Defina uma função assíncrona autônoma com o mesmo padrão de assinatura.

Ambas as abordagens usam a mesma assinatura de manipulador:

  • Primeiro parâmetro: a entrada digitada para este nó
  • Segundo parâmetro: a WorkflowContext[T_Out, T_W_Out]

Tipos de contexto de fluxo de trabalho

O WorkflowContext tipo genérico define quais dados fluem entre executores:

  • WorkflowContext[T_Out] - Usado para nós que enviam mensagens do tipo T_Out para nós posteriores por meio de ctx.send_message()
  • WorkflowContext[T_Out, T_W_Out] - Usado para nós que também produzem saída de fluxo de trabalho do tipo T_W_Out por meio de ctx.yield_output()
  • WorkflowContext sem parâmetros de tipo é equivalente a WorkflowContext[Never, Never], o que significa que esse nó não envia mensagens para nós posteriores nem gera saída de fluxo de trabalho

Tipos de evento

Durante a execução do streaming, você observará estes tipos de evento:

  • ExecutorInvokedEvent - Quando um executor inicia o processamento
  • ExecutorCompletedEvent - Quando um executor conclui o processamento
  • WorkflowOutputEvent - Contém o resultado final do fluxo de trabalho

Padrão do Construtor de Fluxo de Trabalho do Python

O WorkflowBuilder fornece uma API fluente para a construção de fluxos de trabalho:

  • add_edge(): cria conexões direcionadas entre executores
  • set_start_executor(): define o ponto de entrada do fluxo de trabalho
  • build(): finaliza e retorna um objeto de fluxo de trabalho imutável

Exemplo completo

Para obter a implementação completa e pronta para execução, consulte o exemplo no repositório do Agent Framework.

Este exemplo inclui:

  • Implementação completa com todas as importações e documentação
  • Comentários adicionais explicando os conceitos de fluxo de trabalho
  • Saída de exemplo mostrando os resultados esperados

Próximas etapas