Partilhar via


Criar um fluxo de trabalho sequencial simples

Este tutorial demonstra como criar um fluxo de trabalho sequencial simples usando fluxos de trabalho do Agent Framework.

Os fluxos de trabalho sequenciais são a base da construção de sistemas complexos de agentes de IA. Este tutorial mostra como criar um fluxo de trabalho simples de duas etapas, onde cada etapa processa dados e os passa para a próxima etapa.

Visão geral

Neste tutorial, você criará um fluxo de trabalho com dois executores:

  1. Executor de Maiúsculas - Converte texto de entrada em maiúsculas
  2. Reverse Text Executor - Inverte o texto e produz o resultado final

O fluxo de trabalho demonstra conceitos fundamentais como:

  • Criar um executor personalizado com um único handler
  • Criação de um executor personalizado a partir de uma função
  • Usando WorkflowBuilder para conectar executores com bordas
  • Processamento de dados através de etapas sequenciais
  • Observando a execução do fluxo de trabalho através de eventos

Conceitos abordados

Pré-requisitos

Implementação passo a passo

As seções a seguir mostram como criar o fluxo de trabalho sequencial passo a passo.

Etapa 1: Instalar pacotes NuGet

Primeiro, instale os pacotes necessários para seu projeto .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Etapa 2: Definir o Executor de Maiúsculas

Defina um executor que converta texto em maiúsculas:

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Pontos principais:

  • Criar uma função que recebe uma string e devolve a versão maiúscula.
  • Use BindExecutor() para criar um executor a partir da função

Etapa 3: Definir o executor de texto reverso

Defina um executor que inverta o texto:

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Pontos principais:

  • Crie uma classe que herde de Executor<TInput, TOutput>
  • Implementar HandleAsync() para processar a entrada e devolver a saída

Etapa 4: Criar e conectar o fluxo de trabalho

Conecte os executores usando WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Pontos principais:

  • WorkflowBuilder construtor aceita o executor inicial
  • AddEdge() cria uma conexão direcionada de maiúsculas para inversas
  • WithOutputFrom() Especifica quais executores produzem saídas de fluxo de trabalho
  • Build() cria o fluxo de trabalho imutável

Etapa 5: Executar o fluxo de trabalho

Execute o fluxo de trabalho e observe os resultados:

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Etapa 6: Noções básicas sobre a saída do fluxo de trabalho

Ao executar o fluxo de trabalho, você verá saídas como:

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

A entrada "Hello, World!" é primeiro convertida em maiúsculas ("HELLO, WORLD!"), depois invertida ("!DLROW ,OLLEH").

Conceitos-chave explicados

Interface do Executor

Executores das funções:

  • Usar BindExecutor() para criar um executor a partir de uma função

Os executores implementam Executor<TInput, TOutput>:

  • TInput: O tipo de dados que este executor aceita
  • TOutput: O tipo de dados que este executor produz
  • HandleAsync: O método que processa a entrada e retorna a saída

Padrão do Construtor de Fluxo de Trabalho .NET

O WorkflowBuilder fornece uma API fluente para a construção de fluxos de trabalho:

  • Construtor: Recebe o executor inicial
  • AddEdge(): Cria conexões direcionadas entre executores
  • WithOutputFrom(): Especifica quais executores produzem saídas de fluxo de trabalho
  • Build(): Cria o fluxo de trabalho final imutável

Tipos de evento .NET

Durante a execução, você pode observar estes tipos de eventos:

  • ExecutorCompletedEvent - Quando um executor termina o processamento

Exemplo completo do .NET

Para obter a implementação completa e pronta para execução, consulte o exemplo de 01_ExecutorsAndEdges no repositório do Agent Framework.

Este exemplo inclui:

  • Implementação completa com todos usando instruções e estrutura de classe
  • Comentários adicionais explicando os conceitos de fluxo de trabalho
  • Conclua a instalação e configuração do projeto

Visão geral

Neste tutorial, você criará um fluxo de trabalho com dois executores:

  1. Conversor para maiúsculas - Converte o texto fornecido em letras maiúsculas
  2. Reverse Text Executor - Inverte o texto e produz o resultado final

O fluxo de trabalho demonstra conceitos fundamentais como:

  • Duas formas de definir uma unidade de trabalho (um nó executor):
    1. Uma classe personalizada que subclasse Executor com um método assíncrono marcado por @handler
    2. Uma função assíncrona independente decorada com @executor
  • Conectando executores com WorkflowBuilder
  • Passando dados entre etapas com ctx.send_message()
  • Resultado final com ctx.yield_output()
  • Transmissão de eventos para observação em tempo real

Conceitos abordados

Pré-requisitos

  • Python 3.10 ou posterior
  • Pacote Python do Agent Framework Core instalado: pip install agent-framework-core --pre
  • Não são necessários serviços de IA externos para este exemplo básico

Implementação passo a passo

As seções a seguir mostram como criar o fluxo de trabalho sequencial passo a passo.

Etapa 1: Importar módulos necessários

Primeiro, importe os módulos necessários do Agent Framework:

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Etapa 2: Criar o primeiro executor

Crie um executor que converta texto em maiúsculas implementando um executor com um método handler:

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Pontos principais:

  • A subclassificação Executor permite definir um nó com um nome e hooks de ciclo de vida, se necessário.
  • O @handler decorator assinala o método assíncrono que realiza a tarefa
  • A assinatura do gestor segue um contrato:
    • O primeiro parâmetro é a entrada digitada para este nó (aqui: text: str)
    • O segundo parâmetro é um WorkflowContext[T_Out], onde T_Out é o tipo de dados que este nó irá emitir via ctx.send_message() (aqui: str)
  • Dentro de um handler, normalmente calcula-se um resultado e encaminha-se para os nós seguintes usando ctx.send_message(result)

Etapa 3: Criar o segundo executor

Para passos simples, pode saltar a subclasse e definir uma função assíncrona com o mesmo padrão de assinatura (entrada tipada + WorkflowContext) e decorá-la com @executor. Isto cria um nó totalmente funcional que pode ser ligado a um fluxo:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Pontos principais:

  • O decorador @executor transforma uma função assíncrona isolada num nó de fluxo de trabalho
  • O WorkflowContext é parametrizado com dois tipos:
    • T_Out = Never: este nó não envia mensagens para nós a jusante
    • T_W_Out = str: este nó gera uma saída de tipo str para o fluxo de trabalho
  • Os nós terminais geram saídas usando ctx.yield_output() para fornecer resultados do fluxo de trabalho
  • O fluxo de trabalho termina quando fica inativo (já não há trabalho a fazer)

Etapa 4: Criar o fluxo de trabalho

Conecte os executores usando WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Pontos principais:

  • add_edge() cria conexões direcionadas entre executores
  • set_start_executor() define o ponto de entrada
  • build() Finaliza o fluxo de trabalho

Etapa 5: Executar o fluxo de trabalho com streaming

Execute o fluxo de trabalho e observe eventos em tempo real:

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Etapa 6: Entendendo a saída

Ao executar o fluxo de trabalho, você verá eventos como:

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Conceitos-chave explicados

Duas Formas de Definir Executores

  1. Classe personalizada (subclassing Executor): Melhor quando precisas de ganchos de ciclo de vida ou estado complexo. Defina um método assíncrono com o decorador @handler.
  2. Baseado em funções (@executor decorador): Melhor para etapas simples. Defina uma função assíncrona autónoma com o mesmo padrão de assinatura.

Ambas as abordagens usam a mesma assinatura do handler:

  • Primeiro parâmetro: a entrada especificada para este nó
  • Segundo parâmetro: a WorkflowContext[T_Out, T_W_Out]

Tipos de contexto de fluxo de trabalho

O WorkflowContext tipo genérico define que dados fluem entre executores.

  • WorkflowContext[T_Out] - Usado para nós que enviam mensagens do tipo T_Out para nós descendentes via ctx.send_message()
  • WorkflowContext[T_Out, T_W_Out] - Usado para nós que também geram saída de workflow do tipo T_W_Out via ctx.yield_output()
  • WorkflowContext sem parâmetros de tipo é equivalente a WorkflowContext[Never, Never], o que significa que este nó não envia mensagens para nós posteriores nem gera saída de fluxo de trabalho

Tipos de Eventos

Durante a execução do streaming, você observará estes tipos de eventos:

  • ExecutorInvokedEvent - Quando um executor começa a processar
  • ExecutorCompletedEvent - Quando um executor termina o processamento
  • WorkflowOutputEvent - Contém o resultado final do fluxo de trabalho

Padrão Python Construtor de Fluxo de Trabalho

O WorkflowBuilder fornece uma API fluente para a construção de fluxos de trabalho:

  • add_edge(): Cria conexões direcionadas entre executores
  • set_start_executor(): Define o ponto de entrada do fluxo de trabalho
  • build(): Finaliza e retorna um objeto de fluxo de trabalho imutável

Exemplo completo

Para a implementação completa e pronta a executar, consulte o exemplo no repositório Agent Framework.

Este exemplo inclui:

  • Implementação completa com todas as importações e documentação
  • Comentários adicionais explicando os conceitos de fluxo de trabalho
  • Saída de amostra mostrando os resultados esperados

Próximas Etapas