Compartir a través de


Crear un flujo de trabajo secuencial simple

En este tutorial se muestra cómo crear un flujo de trabajo secuencial sencillo mediante flujos de trabajo del marco de agente.

Los flujos de trabajo secuenciales son la base de la creación de sistemas complejos de agentes de IA. En este tutorial se muestra cómo crear un flujo de trabajo sencillo de dos pasos en el que cada paso procesa los datos y lo pasa al paso siguiente.

Información general

En este tutorial, creará un flujo de trabajo con dos ejecutores:

  1. Ejecutor en mayúsculas : convierte el texto de entrada en mayúsculas.
  2. Ejecutor de texto inverso : invierte el texto y genera el resultado final.

El flujo de trabajo muestra conceptos básicos como:

  • Creación de un ejecutor personalizado con un controlador
  • Creación de un ejecutor personalizado a partir de una función
  • Usar WorkflowBuilder para conectar ejecutores con bordes
  • Procesamiento de datos mediante pasos secuenciales
  • Observar la ejecución del flujo de trabajo a través de eventos

Conceptos tratados

Prerrequisitos

  • SDK de .NET 8.0 o posterior
  • No se requiere ningún servicio de inteligencia artificial externo para este ejemplo básico
  • Una nueva aplicación de consola

Implementación paso a paso

En las secciones siguientes se muestra cómo compilar el flujo de trabajo secuencial paso a paso.

Paso 1: Instalar paquetes NuGet

En primer lugar, instale los paquetes necesarios para el proyecto de .NET:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Paso 2: Definir el ejecutor en mayúsculas

Defina un ejecutor que convierta texto en mayúsculas:

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Puntos clave:

  • Cree una función que tome una cadena y devuelva la versión en mayúsculas.
  • Uso BindExecutor() para crear un ejecutor a partir de la función

Paso 3: Definir el ejecutor de texto inverso

Defina un ejecutor que invierte el texto:

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Puntos clave:

  • Creación de una clase que herede de Executor<TInput, TOutput>
  • Implementación HandleAsync() para procesar la entrada y devolver la salida

Paso 4: Compilar y conectar el flujo de trabajo

Conecte los ejecutores mediante WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Puntos clave:

  • WorkflowBuilder constructor toma el ejecutor inicial
  • AddEdge() crea una conexión dirigida desde mayúsculas a inversas
  • WithOutputFrom() especifica qué ejecutores generan salidas de flujo de trabajo.
  • Build() crea el flujo de trabajo inmutable.

Paso 5: Ejecutar el flujo de trabajo

Ejecute el flujo de trabajo y observe los resultados:

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Paso 6: Descripción de la salida del flujo de trabajo

Al ejecutar el flujo de trabajo, verá una salida similar a la siguiente:

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

La entrada "Hello, World!" se convierte primero en mayúsculas ("HELLO, WORLD!") y luego se invierte ("! DLROW ,OLLEH").

Conceptos clave explicados

Interfaz ejecutor

Ejecutores de funciones:

  • Uso BindExecutor() para crear un ejecutor a partir de una función

Los ejecutores implementan Executor<TInput, TOutput>:

  • TInput: el tipo de datos que acepta este ejecutor
  • TOutput: el tipo de datos que genera este ejecutor
  • HandleAsync: el método que procesa la entrada y devuelve la salida.

Patrón del Generador de flujos de trabajo de .NET

WorkflowBuilder proporciona una API fluida para construir flujos de trabajo:

  • Constructor: toma el ejecutor inicial
  • AddEdge(): crea conexiones dirigidas entre ejecutores
  • WithOutputFrom():especifica qué ejecutores generan salidas de flujo de trabajo.
  • Build(): crea el flujo de trabajo inmutable final.

Tipos de eventos de .NET

Durante la ejecución, puede observar estos tipos de eventos:

  • ExecutorCompletedEvent - Cuando un ejecutor termina de procesar

Ejemplo completo de .NET

Para obtener la implementación completa y lista para ejecutarse, consulte el ejemplo de 01_ExecutorsAndEdges en el repositorio de Agent Framework.

Este ejemplo incluye:

  • Implementación completa con todas las sentencias using y la estructura de clase
  • Comentarios adicionales que explican los conceptos del flujo de trabajo
  • Finalizar la preparación y la configuración del proyecto

Información general

En este tutorial, creará un flujo de trabajo con dos ejecutores:

  1. Ejecutor de mayúsculas : convierte el texto de entrada en mayúsculas
  2. Ejecutor de texto inverso : invierte el texto y genera el resultado final.

El flujo de trabajo muestra conceptos básicos como:

  • Dos maneras de definir una unidad de trabajo (un nodo ejecutor):
    1. Una clase personalizada que subclase de Executor con un método asíncrono designado por @handler
    2. Una función asincrónica independiente decorada con @executor
  • Conexión de ejecutores con WorkflowBuilder
  • Paso de datos entre pasos con ctx.send_message()
  • Generando la salida final con ctx.yield_output()
  • Streaming de eventos para la observabilidad en tiempo real

Conceptos tratados

Prerrequisitos

  • Python 3.10 o posterior
  • Paquete de Python de Agent Framework Core instalado: pip install agent-framework-core --pre
  • No se requiere ningún servicio de inteligencia artificial externo para este ejemplo básico

Implementación paso a paso

En las secciones siguientes se muestra cómo compilar el flujo de trabajo secuencial paso a paso.

Paso 1: Importar módulos necesarios

En primer lugar, importe los módulos necesarios desde Agent Framework:

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Paso 2: Crear el primer ejecutor

Cree un ejecutor que convierta texto en mayúsculas mediante la implementación de un ejecutor con un método de controlador:

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Puntos clave:

  • La subclasificación Executor le permite definir un nodo con nombre con enlaces de ciclo de vida si es necesario.
  • El decorador @handler marca el método asincrónico que lleva a cabo el trabajo.
  • La firma del manejador sigue un contrato:
    • El primer parámetro es la entrada tipada para este nodo (como se muestra aquí: text: str)
    • El segundo parámetro es un WorkflowContext[T_Out], donde T_Out es el tipo de datos que este nodo emitirá a través de ctx.send_message() (aquí: str)
  • Dentro de un controlador, normalmente se calcula un resultado y se reenvía a los nodos de bajada mediante ctx.send_message(result)

Paso 3: Crear el segundo ejecutor

Puede omitir las subclases y definir una función asincrónica con el mismo patrón de firma (entrada tipada + WorkflowContext) y decorarla con @executor. Esto crea un nodo totalmente funcional que se puede cablearse en un flujo:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Puntos clave:

  • El @executor decorador transforma una función asincrónica independiente en un nodo de flujo de trabajo.
  • WorkflowContext se parametriza con dos tipos:
    • T_Out = Never: este nodo no envía mensajes a los nodos de bajada.
    • T_W_Out = str: este nodo produce la salida del flujo de trabajo del tipo str
  • Los nodos de terminal producen salidas mediante ctx.yield_output() para proporcionar resultados de flujo de trabajo
  • El flujo de trabajo se completa cuando se vuelve inactivo (ya no hay trabajo que hacer)

Paso 4: Compilar el flujo de trabajo

Conecte los ejecutores mediante WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Puntos clave:

  • add_edge() crea conexiones dirigidas entre ejecutores
  • set_start_executor() define el punto de entrada.
  • build() finaliza el flujo de trabajo.

Paso 5: Ejecución del flujo de trabajo con streaming

Ejecute el flujo de trabajo y observe eventos en tiempo real:

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Paso 6: Comprender la salida

Al ejecutar el flujo de trabajo, verá eventos como:

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Conceptos clave explicados

Dos maneras de definir ejecutores

  1. Clase personalizada (subclases Executor): mejor cuando necesite enlaces de ciclo de vida o estado complejo. Defina un método asincrónico con el @handler decorador.
  2. Basado en funciones (@executor decorador): Mejor para pasos sencillos. Defina una función asincrónica independiente con el mismo patrón de firma.

Ambos enfoques usan la misma firma del controlador:

  • Primer parámetro: entrada tipificada de este nodo
  • Segundo parámetro: WorkflowContext[T_Out, T_W_Out]

Tipos de contexto de flujo de trabajo

El WorkflowContext tipo genérico define qué flujos de datos fluyen entre ejecutores:

  • WorkflowContext[T_Out] : se usa para los nodos que envían mensajes de tipo T_Out a nodos de bajada a través de ctx.send_message()
  • WorkflowContext[T_Out, T_W_Out] : se usa para los nodos que también producen la salida del flujo de trabajo de tipo T_W_Out a través de ctx.yield_output()
  • WorkflowContext sin parámetros de tipo es equivalente a WorkflowContext[Never, Never], lo que significa que este nodo no envía mensajes a nodos de bajada ni produce la salida del flujo de trabajo.

Tipos de eventos

Durante la ejecución de streaming, observará estos tipos de eventos:

  • ExecutorInvokedEvent - Cuando un ejecutor inicia el procesamiento
  • ExecutorCompletedEvent - Cuando un ejecutor termina de procesar
  • WorkflowOutputEvent : contiene el resultado final del flujo de trabajo.

Patrón del Generador de flujos de trabajo de Python

WorkflowBuilder proporciona una API fluida para construir flujos de trabajo:

  • add_edge(): crea conexiones dirigidas entre ejecutores
  • set_start_executor(): define el punto de entrada del flujo de trabajo.
  • build(): finaliza y devuelve un objeto de flujo de trabajo inmutable.

Ejemplo completo

Para obtener la implementación completa y lista para ejecutarse, consulte el ejemplo en el repositorio de Agent Framework.

Este ejemplo incluye:

  • Implementación completa con todas las importaciones y documentación
  • Comentarios adicionales que explican los conceptos del flujo de trabajo
  • Salida de ejemplo que muestra los resultados esperados

Pasos siguientes