Freigeben über


Erstellen eines einfachen sequenziellen Workflows

In diesem Lernprogramm wird das Erstellen eines einfachen sequenziellen Workflows mithilfe von Agent Framework-Workflows veranschaulicht.

Sequenzielle Workflows sind die Grundlage für die Erstellung komplexer KI-Agent-Systeme. In diesem Lernprogramm wird gezeigt, wie Sie einen einfachen zweistufigen Workflow erstellen, in dem jeder Schritt Daten verarbeitet und an den nächsten Schritt übergibt.

Überblick

In diesem Lernprogramm erstellen Sie einen Workflow mit zwei Executoren:

  1. Großbuchstaben-Executor – Eingabetext in Großbuchstaben konvertieren
  2. Umgekehrter Textausführer – Umkehrt den Text und gibt das Endergebnis aus.

Der Workflow veranschaulicht kerne Konzepte wie:

  • Erstellen eines benutzerdefinierten Executors mit einem Handler
  • Erstellen eines benutzerdefinierten Executors aus einer Funktion
  • Verwenden von WorkflowBuilder zur Verbindung von Ausführenden mit Kanten
  • Verarbeiten von Daten durch sequenzielle Schritte
  • Beobachtung der Workflowausführung durch Ereignisse

Behandelte Konzepte

Voraussetzungen

  • .NET 8.0 SDK oder höher
  • Für dieses grundlegende Beispiel sind keine externen KI-Dienste erforderlich.
  • Eine neue Konsolenanwendung

Schrittweise Implementierung

In den folgenden Abschnitten wird gezeigt, wie Sie den sequenziellen Workflow schritt für Schritt erstellen.

Schritt 1: Installieren von NuGet-Paketen

Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:

dotnet add package Microsoft.Agents.AI.Workflows --prerelease

Schritt 2: Definieren des Großbuchstaben-Executors

Definieren Sie einen Executor, der Text in Großbuchstaben konvertiert:

using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;

/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");

Wichtige Punkte:

  • Erstellen einer Funktion, die eine Zeichenfolge verwendet und die Großbuchstabenversion zurückgibt
  • Verwenden Sie BindExecutor(), um einen Executor aus der Funktion zu erstellen

Schritt 3: Definieren des Umgekehrten Textausführers

Definieren Sie einen Executor, der den Text umkehrt:

/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
    public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
    {
        // Reverse the input text
        return ValueTask.FromResult(new string(input.Reverse().ToArray()));
    }
}

ReverseTextExecutor reverse = new();

Wichtige Punkte:

  • Erstelle eine Klasse, die von Executor<TInput, TOutput> erbt.
  • Implementieren HandleAsync() , um die Eingabe zu verarbeiten und die Ausgabe zurückzugeben

Schritt 4: Erstellen und Verbinden des Workflows

Verbinden sie die Executoren mithilfe von WorkflowBuilder:

// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();

Wichtige Punkte:

  • WorkflowBuilder Konstruktor übernimmt den Startausführer
  • AddEdge() erstellt eine gerichtete Verbindung von Großbuchstaben zur Umkehrung.
  • WithOutputFrom() Gibt an, welche Executoren Workflowausgaben erzeugen
  • Build() erstellt den unveränderlichen Workflow.

Schritt 5: Ausführen des Workflows

Führen Sie den Workflow aus, und beobachten Sie die Ergebnisse:

// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
    switch (evt)
    {
        case ExecutorCompletedEvent executorComplete:
            Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
            break;
    }
}

Schritt 6: Verständnis des Workflows-Ergebnisses

Wenn Sie den Workflow ausführen, wird die Ausgabe wie folgt angezeigt:

UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH

Die Eingabe "Hello, World!" wird zuerst in Großbuchstaben konvertiert ("HELLO, WORLD!"), dann umgekehrt ("! DLROW ,OLLEH").

Wichtige Konzepte erläutert

Executor-Schnittstelle

Exekutoren für Funktionen

  • Verwenden Sie BindExecutor(), um einen Executor aus einer Funktion zu erstellen

Executors implementieren Executor<TInput, TOutput>:

  • TInput: Der Datentyp, den dieser Executor akzeptiert
  • TOutput: Der Typ der Daten, die dieser Executor erzeugt
  • HandleAsync: Die Methode, die die Eingabe verarbeitet und die Ausgabe zurückgibt

.NET-Workflow-Generator-Muster

Dies WorkflowBuilder stellt eine Fluent-API zum Erstellen von Workflows bereit:

  • Konstruktor: Übernimmt den Ausgangs-Executor
  • AddEdge(): Erstellt gerichtete Verbindungen zwischen Executoren
  • WithOutputFrom(): Gibt an, welche Executoren Workflowausgaben erzeugen
  • Build(): Erstellt den endgültigen unveränderlichen Workflow

.NET-Ereignistypen

Während der Ausführung können Sie diese Ereignistypen beobachten:

  • ExecutorCompletedEvent - Wenn ein Executor die Verarbeitung beendet

Vollständiges .NET-Beispiel

Die vollständige, einsatzbereite Implementierung finden Sie im 01_ExecutorsAndEdges Beispiel im Agent Framework-Repository.

Dieses Beispiel umfasst:

  • Vollständige Implementierung mit allen using-Anweisungen und Klassenstruktur
  • Zusätzliche Kommentare zur Erläuterung der Workflowkonzepte
  • Abschließen der Projekteinrichtung und -konfiguration

Überblick

In diesem Lernprogramm erstellen Sie einen Workflow mit zwei Executoren:

  1. Großbuchstaben-Konvertierer – Konvertiert Eingabetext in Großbuchstaben
  2. Umgekehrter Textausführer – Umkehrt den Text und gibt das Endergebnis aus.

Der Workflow veranschaulicht kerne Konzepte wie:

  • Zwei Möglichkeiten, eine Arbeitseinheit (ein Executor Node) zu definieren:
    1. Eine benutzerdefinierte Klasse, die von Executor abgeleitet ist und eine asynchrone Methode, die durch @handler gekennzeichnet ist.
    2. Eine eigenständige asynchrone Funktion, die mit @executor dekoriert ist.
  • Verbinden von Executoren mit WorkflowBuilder
  • Übergeben von Daten zwischen Schritten mit ctx.send_message()
  • Erzeugung der Endausgabe mit ctx.yield_output()
  • Streamingereignisse zur Echtzeitbeobachtbarkeit

Behandelte Konzepte

Voraussetzungen

  • Python 3.10 oder höher
  • Agent Framework Core Python-Paket installiert: pip install agent-framework-core --pre
  • Für dieses grundlegende Beispiel sind keine externen KI-Dienste erforderlich.

Schrittweise Implementierung

In den folgenden Abschnitten wird gezeigt, wie Sie den sequenziellen Workflow schritt für Schritt erstellen.

Schritt 1: Importieren erforderlicher Module

Importieren Sie zunächst die erforderlichen Module aus Dem Agent Framework:

import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor

Schritt 2: Erstellen des ersten Executors

Erstellen Sie einen Executor, der Text in Großbuchstaben konvertiert, indem Sie einen Executor mit einer Handlermethode implementieren:

class UpperCase(Executor):
    def __init__(self, id: str):
        super().__init__(id=id)

    @handler
    async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
        """Convert the input to uppercase and forward it to the next node.

        Note: The WorkflowContext is parameterized with the type this handler will
        emit. Here WorkflowContext[str] means downstream nodes should expect str.
        """
        result = text.upper()

        # Send the result to the next executor in the workflow.
        await ctx.send_message(result)

Wichtige Punkte:

  • Mithilfe von Executor Unterklassen können Sie bei Bedarf einen benannten Knoten mit Lifecycle-Hooks definieren.
  • Der @handler Dekorateur markiert die asynchrone Methode, die die Arbeit ausführt.
  • Die Handlersignatur folgt einem Vertrag:
    • Der erste Parameter ist die eingegebene Eingabe für diesen Knoten (hier: text: str)
    • Der zweite Parameter ist ein WorkflowContext[T_Out], wobei T_Out der Datentyp ist, den dieser Knoten über ctx.send_message() (hier: str) ausgeben wird.
  • Innerhalb eines Handlers berechnen Sie in der Regel ein Ergebnis und leiten es an nachgelagerte Knoten mithilfe von ctx.send_message(result) weiter.

Schritt 3: Erstellen des zweiten Executors

Für einfache Schritte können Sie die Unterklassen überspringen und eine asynchrone Funktion mit demselben Signaturmuster (eingegebene Eingabe + WorkflowContext) definieren und mit @executorversehen. Dadurch wird ein voll funktionsfähiger Knoten erstellt, der in einen Fluss verkabelt werden kann:

@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
    """Reverse the input and yield the workflow output."""
    result = text[::-1]

    # Yield the final output for this workflow run
    await ctx.yield_output(result)

Wichtige Punkte:

  • Der @executor Dekorateur wandelt eine eigenständige asynchrone Funktion in einen Workflowknoten um.
  • Dies WorkflowContext ist mit zwei Typen parametrisiert:
    • T_Out = Never: Dieser Knoten sendet keine Nachrichten an nachgeschaltete Knoten.
    • T_W_Out = str: Dieser Knoten liefert die Workflowausgabe des Typs. str
  • Terminalknoten liefern Ausgaben mithilfe von ctx.yield_output(), um Workflow-Resultate bereitzustellen.
  • Der Workflow wird abgeschlossen, wenn er in den Leerlauf übergeht (keine Arbeit mehr zu erledigen)

Schritt 4: Erstellen des Workflows

Verbinden sie die Executoren mithilfe von WorkflowBuilder:

upper_case = UpperCase(id="upper_case_executor")

workflow = (
    WorkflowBuilder()
    .add_edge(upper_case, reverse_text)
    .set_start_executor(upper_case)
    .build()
)

Wichtige Punkte:

  • add_edge() erstellt gerichtete Verbindungen zwischen Executoren
  • set_start_executor() definiert den Einstiegspunkt.
  • build() finalisiert den Workflow.

Schritt 5: Ausführen des Workflows mit Streaming

Führen Sie den Workflow aus und beobachten Sie Ereignisse in Echtzeit:

async def main():
    # Run the workflow and stream events
    async for event in workflow.run_stream("hello world"):
        print(f"Event: {event}")
        if isinstance(event, WorkflowOutputEvent):
            print(f"Workflow completed with result: {event.data}")

if __name__ == "__main__":
    asyncio.run(main())

Schritt 6: Verständnis der Ausgabe

Wenn Sie den Workflow ausführen, werden Ereignisse wie folgt angezeigt:

Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH

Wichtige Konzepte erläutert

Zwei Möglichkeiten zum Definieren von Executoren

  1. Benutzerdefinierte Klasse (UnterklassenbildungExecutor): Am besten geeignet, wenn Sie Lebenszyklus-Hooks oder komplexen Zustand benötigen. ** Definieren Sie eine asynchrone Methode mit dem @handler Decorator.
  2. Funktionsbasiert (@executor Decorator): Am besten geeignet für einfache Schritte. Definieren Sie eine eigenständige asynchrone Funktion mit demselben Signaturmuster.

Beide Ansätze verwenden dieselbe Handlersignatur:

  • Erster Parameter: die eingegebene Eingabe für diesen Knoten
  • Zweiter Parameter: a WorkflowContext[T_Out, T_W_Out]

Workflowkontexttypen

Der WorkflowContext generische Typ definiert, welche Datenflüsse zwischen Executoren fließen:

  • WorkflowContext[T_Out] - Wird für Knoten verwendet, die Nachrichten vom Typ T_Out an nachgeschaltete Knoten über ctx.send_message() senden
  • WorkflowContext[T_Out, T_W_Out] - Wird für Knoten verwendet, die zusätzlich Workflow-Output vom Typ T_W_Out über ctx.yield_output() liefern.
  • WorkflowContext ohne Typparameter entspricht WorkflowContext[Never, Never], d. h., dieser Knoten sendet weder Nachrichten an nachgelagerte Knoten noch liefert Workflowausgabe.

Ereignistypen

Während der Streamingausführung beobachten Sie die folgenden Ereignistypen:

  • ExecutorInvokedEvent - Wenn ein Executor die Verarbeitung startet
  • ExecutorCompletedEvent - Wenn ein Executor die Verarbeitung beendet
  • WorkflowOutputEvent - Enthält das endgültige Workflowergebnis.

Python-Workflow-Generator-Muster

Dies WorkflowBuilder stellt eine Fluent-API zum Erstellen von Workflows bereit:

  • add_edge(): Erstellt gerichtete Verbindungen zwischen Executoren
  • set_start_executor(): Definiert den Workfloweinstiegspunkt.
  • build(): Schließt ein unveränderliches Workflowobjekt ab und gibt es zurück.

Vollständiges Beispiel

Die vollständige, einsatzbereite Implementierung finden Sie im Beispiel im Agent Framework-Repository.

Dieses Beispiel umfasst:

  • Vollständige Implementierung mit allen Importen und Dokumentationen
  • Zusätzliche Kommentare zur Erläuterung der Workflowkonzepte
  • Beispielausgabe mit den erwarteten Ergebnissen

Nächste Schritte