Partager via


Créer un flux de travail séquentiel simple

Ce tutoriel montre comment créer un flux de travail séquentiel simple à l’aide de flux de travail Agent Framework.

Les flux de travail séquentiels constituent la base de la création de systèmes d’agent IA complexes. Ce tutoriel montre comment créer un flux de travail en deux étapes simple où chaque étape traite les données et les transmet à l’étape suivante.

Aperçu

Dans ce tutoriel, vous allez créer un flux de travail avec deux exécuteurs :

  1. Exécuteur majuscule - Convertit le texte d’entrée en majuscules
  2. Exécuteur de texte inverse : inverse le texte et génère le résultat final

Le flux de travail illustre les concepts fondamentaux tels que :

  • Création d’un exécuteur personnalisé avec un seul gestionnaire
  • Création d’un exécuteur personnalisé à partir d’une fonction
  • Utilisation de WorkflowBuilder pour connecter des exécuteurs avec des arêtes
  • Traitement des données par le biais d’étapes séquentielles
  • Observation de l’exécution du flux de travail par le biais d’événements

Concepts couverts

Prerequisites

Implémentation pas à pas

Les sections suivantes montrent comment générer le flux de travail séquentiel étape par étape.

Étape 1 : Installer des packages NuGet

Tout d’abord, installez les packages requis pour votre projet .NET :

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Étape 2 : Définir l’exécuteur majuscule

Définissez un exécuteur qui convertit le texte en majuscules :

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Points clés :

  • Créer une fonction qui prend une chaîne et retourne la version majuscule
  • Permet BindExecutor() de créer un exécuteur à partir de la fonction

Étape 3 : Définir l’exécuteur de texte inverse

Définissez un exécuteur qui inverse le texte :

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Points clés :

  • Créer une classe qui hérite de Executor<TInput, TOutput>
  • Implémenter HandleAsync() pour traiter l’entrée et retourner la sortie

Étape 4 : Générer et connecter le flux de travail

Connectez les exécuteurs à l’aide de WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Points clés :

  • WorkflowBuilder constructeur accepte l’exécuteur de démarrage
  • AddEdge() crée une connexion dirigée de l'original vers l'inverse
  • WithOutputFrom() spécifie les exécuteurs qui produisent des sorties de flux de travail
  • Build() crée le flux de travail immuable

Étape 5 : Exécuter le flux de travail

Exécutez le flux de travail et observez les résultats :

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Étape 6 : Compréhension de la sortie du flux de travail

Lorsque vous exécutez le flux de travail, vous verrez la sortie comme suit :

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

L’entrée « Hello, World ! » est d’abord convertie en majuscules (« HELLO, WORLD ! »), puis inversée (« !DLROW ,OLLEH »).

Concepts clés expliqués

Interface d’exécuteur

Exécuteurs de fonctions :

  • Permet BindExecutor() de créer un exécuteur à partir d’une fonction

Les exécuteurs implémentent Executor<TInput, TOutput>:

  • TInput : le type de données que cet exécuteur accepte
  • TOutput : Le type de données que cet exécuteur produit
  • HandleAsync : méthode qui traite l’entrée et retourne la sortie

Modèle du Générateur de flux de travail .NET

Le WorkflowBuilder fournit une API fluide pour la construction de flux de travail :

  • Constructeur : prend l’exécuteur de départ
  • AddEdge() : crée des connexions dirigées entre les exécuteurs
  • WithOutputFrom() : spécifie les exécuteurs qui produisent des sorties de flux de travail
  • Build() : Crée le flux de travail immuable final

Types d’événements .NET

Pendant l’exécution, vous pouvez observer ces types d’événements :

  • ExecutorCompletedEvent - Lorsqu’un exécuteur termine le traitement

Exemple .NET complet

Pour obtenir l’implémentation complète et prête à l’exécution, consultez l’exemple 01_ExecutorsAndEdges dans le référentiel Agent Framework.

Cet exemple inclut les éléments suivants :

  • Implémentation complète avec toutes les instructions using et la structure de classe
  • Commentaires supplémentaires expliquant les concepts du flux de travail
  • Terminer l’installation et la configuration du projet

Aperçu

Dans ce tutoriel, vous allez créer un flux de travail avec deux exécuteurs :

  1. Convertisseur en majuscules - Convertit le texte d’entrée en majuscules
  2. Exécuteur de texte inverse : inverse le texte et génère le résultat final

Le flux de travail illustre les concepts fondamentaux tels que :

  • Utilisation de l’élément @executor décoratif pour créer des nœuds de flux de travail
  • Connexion d’exécuteurs avec WorkflowBuilder
  • Passage de données entre les étapes avec ctx.send_message()
  • Résultat final avec ctx.yield_output()
  • Diffusion en continu d’événements pour l’observabilité en temps réel

Concepts couverts

Prerequisites

  • Python 3.10 ou version ultérieure
  • Package Python Agent Framework Core installé : pip install agent-framework-core --pre
  • Aucun service IA externe n’est requis pour cet exemple de base

Implémentation pas à pas

Les sections suivantes montrent comment générer le flux de travail séquentiel étape par étape.

Étape 1 : Importer des modules requis

Tout d’abord, importez les modules nécessaires à partir de Agent Framework :

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Étape 2 : Créer le premier exécuteur

Créez un exécuteur qui convertit du texte en majuscules en implémentant un exécuteur avec une méthode de gestionnaire :

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Points clés :

  • Le @executor décorateur inscrit cette fonction en tant que nœud de flux de travail
  • WorkflowContext[str] indique que cet exécuteur envoie une chaîne en aval en spécifiant le premier paramètre de type
  • ctx.send_message() transmet les données à l’étape suivante

Étape 3 : Créer le deuxième exécuteur

Créez un exécuteur qui inverse le texte et génère la sortie finale d’une méthode décorée avec @executor:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Points clés :

  • WorkflowContext[Never, str] indique qu’il s’agit d’un exécuteur de terminal qui n’envoie aucun message en spécifiant Never comme premier paramètre de type, mais qui produit des sorties de flux de travail en spécifiant str comme deuxième paramètre
  • ctx.yield_output() fournit le résultat final du flux de travail
  • Le flux de travail se termine lorsqu’il devient inactif

Étape 4 : Générer le flux de travail

Connectez les exécuteurs à l’aide de WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Points clés :

  • add_edge() crée des connexions dirigées entre exécuteurs
  • set_start_executor() définit le point d’entrée
  • build() finalise le flux de travail

Étape 5 : Exécuter le flux de travail avec streaming

Exécutez le flux de travail et observez les événements en temps réel :

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Étape 6 : Présentation de la sortie

Lorsque vous exécutez le flux de travail, vous verrez des événements tels que :

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Concepts clés expliqués

Types de contexte de flux de travail

Le WorkflowContext type générique définit les flux de données entre les exécuteurs :

  • WorkflowContext[str] - Envoie une chaîne à l’exécuteur suivant
  • WorkflowContext[Never, str] - Exécuteur de terminal qui génère la sortie du flux de travail de type chaîne

Types d’événements

Pendant l’exécution de streaming, vous observerez ces types d’événements :

  • ExecutorInvokedEvent - Lorsqu’un exécuteur démarre le traitement
  • ExecutorCompletedEvent - Lorsqu’un exécuteur termine le traitement
  • WorkflowOutputEvent - Contient le résultat final du flux de travail

Modèle générateur de flux de travail Python

Le WorkflowBuilder fournit une API fluide pour la construction de flux de travail :

  • add_edge() : crée des connexions dirigées entre exécuteurs
  • set_start_executor() : définit le point d’entrée du flux de travail
  • build() : Finalise et retourne un objet de flux de travail immuable

Exemple complet

Pour obtenir l’implémentation complète et prête à l’exécution, consultez l’exemple dans le référentiel Agent Framework.

Cet exemple inclut les éléments suivants :

  • Implémentation complète avec toutes les importations et la documentation
  • Commentaires supplémentaires expliquant les concepts du flux de travail
  • Exemple de sortie montrant les résultats attendus

Étapes suivantes