Hinweis
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, sich anzumelden oder das Verzeichnis zu wechseln.
Für den Zugriff auf diese Seite ist eine Autorisierung erforderlich. Sie können versuchen, das Verzeichnis zu wechseln.
In diesem Lernprogramm wird das Erstellen eines einfachen sequenziellen Workflows mithilfe von Agent Framework-Workflows veranschaulicht.
Sequenzielle Workflows sind die Grundlage für die Erstellung komplexer KI-Agent-Systeme. In diesem Lernprogramm wird gezeigt, wie Sie einen einfachen zweistufigen Workflow erstellen, in dem jeder Schritt Daten verarbeitet und an den nächsten Schritt übergibt.
Überblick
In diesem Lernprogramm erstellen Sie einen Workflow mit zwei Executoren:
- Großbuchstaben-Executor – Eingabetext in Großbuchstaben konvertieren
- Umgekehrter Textausführer – Umkehrt den Text und gibt das Endergebnis aus.
Der Workflow veranschaulicht kerne Konzepte wie:
- Erstellen eines benutzerdefinierten Executors mit einem Handler
- Erstellen eines benutzerdefinierten Executors aus einer Funktion
- Verwenden von
WorkflowBuilderzur Verbindung von Ausführenden mit Kanten - Verarbeiten von Daten durch sequenzielle Schritte
- Beobachtung der Workflowausführung durch Ereignisse
Behandelte Konzepte
Voraussetzungen
- .NET 8.0 SDK oder höher
- Für dieses grundlegende Beispiel sind keine externen KI-Dienste erforderlich.
- Eine neue Konsolenanwendung
Schrittweise Implementierung
In den folgenden Abschnitten wird gezeigt, wie Sie den sequenziellen Workflow schritt für Schritt erstellen.
Schritt 1: Installieren von NuGet-Paketen
Installieren Sie zunächst die erforderlichen Pakete für Ihr .NET-Projekt:
dotnet add package Microsoft.Agents.AI.Workflows --prerelease
Schritt 2: Definieren des Großbuchstaben-Executors
Definieren Sie einen Executor, der Text in Großbuchstaben konvertiert:
using System;
using System.Linq;
using System.Threading.Tasks;
using Microsoft.Agents.AI.Workflows;
/// <summary>
/// First executor: converts input text to uppercase.
/// </summary>
Func<string, string> uppercaseFunc = s => s.ToUpperInvariant();
var uppercase = uppercaseFunc.BindExecutor("UppercaseExecutor");
Wichtige Punkte:
- Erstellen einer Funktion, die eine Zeichenfolge verwendet und die Großbuchstabenversion zurückgibt
- Verwenden Sie
BindExecutor(), um einen Executor aus der Funktion zu erstellen
Schritt 3: Definieren des Umgekehrten Textausführers
Definieren Sie einen Executor, der den Text umkehrt:
/// <summary>
/// Second executor: reverses the input text and completes the workflow.
/// </summary>
internal sealed class ReverseTextExecutor() : Executor<string, string>("ReverseTextExecutor")
{
public override ValueTask<string> HandleAsync(string input, IWorkflowContext context, CancellationToken cancellationToken = default)
{
// Reverse the input text
return ValueTask.FromResult(new string(input.Reverse().ToArray()));
}
}
ReverseTextExecutor reverse = new();
Wichtige Punkte:
- Erstelle eine Klasse, die von
Executor<TInput, TOutput>erbt. - Implementieren
HandleAsync(), um die Eingabe zu verarbeiten und die Ausgabe zurückzugeben
Schritt 4: Erstellen und Verbinden des Workflows
Verbinden sie die Executoren mithilfe von WorkflowBuilder:
// Build the workflow by connecting executors sequentially
WorkflowBuilder builder = new(uppercase);
builder.AddEdge(uppercase, reverse).WithOutputFrom(reverse);
var workflow = builder.Build();
Wichtige Punkte:
-
WorkflowBuilderKonstruktor übernimmt den Startausführer -
AddEdge()erstellt eine gerichtete Verbindung von Großbuchstaben zur Umkehrung. -
WithOutputFrom()Gibt an, welche Executoren Workflowausgaben erzeugen -
Build()erstellt den unveränderlichen Workflow.
Schritt 5: Ausführen des Workflows
Führen Sie den Workflow aus, und beobachten Sie die Ergebnisse:
// Execute the workflow with input data
await using Run run = await InProcessExecution.RunAsync(workflow, "Hello, World!");
foreach (WorkflowEvent evt in run.NewEvents)
{
switch (evt)
{
case ExecutorCompletedEvent executorComplete:
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
break;
}
}
Schritt 6: Verständnis des Workflows-Ergebnisses
Wenn Sie den Workflow ausführen, wird die Ausgabe wie folgt angezeigt:
UppercaseExecutor: HELLO, WORLD!
ReverseTextExecutor: !DLROW ,OLLEH
Die Eingabe "Hello, World!" wird zuerst in Großbuchstaben konvertiert ("HELLO, WORLD!"), dann umgekehrt ("! DLROW ,OLLEH").
Wichtige Konzepte erläutert
Executor-Schnittstelle
Exekutoren für Funktionen
- Verwenden Sie
BindExecutor(), um einen Executor aus einer Funktion zu erstellen
Executors implementieren Executor<TInput, TOutput>:
- TInput: Der Datentyp, den dieser Executor akzeptiert
- TOutput: Der Typ der Daten, die dieser Executor erzeugt
- HandleAsync: Die Methode, die die Eingabe verarbeitet und die Ausgabe zurückgibt
.NET-Workflow-Generator-Muster
Dies WorkflowBuilder stellt eine Fluent-API zum Erstellen von Workflows bereit:
- Konstruktor: Übernimmt den Ausgangs-Executor
- AddEdge(): Erstellt gerichtete Verbindungen zwischen Executoren
- WithOutputFrom(): Gibt an, welche Executoren Workflowausgaben erzeugen
- Build(): Erstellt den endgültigen unveränderlichen Workflow
.NET-Ereignistypen
Während der Ausführung können Sie diese Ereignistypen beobachten:
-
ExecutorCompletedEvent- Wenn ein Executor die Verarbeitung beendet
Vollständiges .NET-Beispiel
Die vollständige, einsatzbereite Implementierung finden Sie im 01_ExecutorsAndEdges Beispiel im Agent Framework-Repository.
Dieses Beispiel umfasst:
- Vollständige Implementierung mit allen using-Anweisungen und Klassenstruktur
- Zusätzliche Kommentare zur Erläuterung der Workflowkonzepte
- Abschließen der Projekteinrichtung und -konfiguration
Überblick
In diesem Lernprogramm erstellen Sie einen Workflow mit zwei Executoren:
- Großbuchstaben-Konvertierer – Konvertiert Eingabetext in Großbuchstaben
- Umgekehrter Textausführer – Umkehrt den Text und gibt das Endergebnis aus.
Der Workflow veranschaulicht kerne Konzepte wie:
- Zwei Möglichkeiten, eine Arbeitseinheit (ein Executor Node) zu definieren:
- Eine benutzerdefinierte Klasse, die von
Executorabgeleitet ist und eine asynchrone Methode, die durch@handlergekennzeichnet ist. - Eine eigenständige asynchrone Funktion, die mit
@executordekoriert ist.
- Eine benutzerdefinierte Klasse, die von
- Verbinden von Executoren mit
WorkflowBuilder - Übergeben von Daten zwischen Schritten mit
ctx.send_message() - Erzeugung der Endausgabe mit
ctx.yield_output() - Streamingereignisse zur Echtzeitbeobachtbarkeit
Behandelte Konzepte
Voraussetzungen
- Python 3.10 oder höher
- Agent Framework Core Python-Paket installiert:
pip install agent-framework-core --pre - Für dieses grundlegende Beispiel sind keine externen KI-Dienste erforderlich.
Schrittweise Implementierung
In den folgenden Abschnitten wird gezeigt, wie Sie den sequenziellen Workflow schritt für Schritt erstellen.
Schritt 1: Importieren erforderlicher Module
Importieren Sie zunächst die erforderlichen Module aus Dem Agent Framework:
import asyncio
from typing_extensions import Never
from agent_framework import WorkflowBuilder, WorkflowContext, WorkflowOutputEvent, executor
Schritt 2: Erstellen des ersten Executors
Erstellen Sie einen Executor, der Text in Großbuchstaben konvertiert, indem Sie einen Executor mit einer Handlermethode implementieren:
class UpperCase(Executor):
def __init__(self, id: str):
super().__init__(id=id)
@handler
async def to_upper_case(self, text: str, ctx: WorkflowContext[str]) -> None:
"""Convert the input to uppercase and forward it to the next node.
Note: The WorkflowContext is parameterized with the type this handler will
emit. Here WorkflowContext[str] means downstream nodes should expect str.
"""
result = text.upper()
# Send the result to the next executor in the workflow.
await ctx.send_message(result)
Wichtige Punkte:
- Mithilfe von
ExecutorUnterklassen können Sie bei Bedarf einen benannten Knoten mit Lifecycle-Hooks definieren. - Der
@handlerDekorateur markiert die asynchrone Methode, die die Arbeit ausführt. - Die Handlersignatur folgt einem Vertrag:
- Der erste Parameter ist die eingegebene Eingabe für diesen Knoten (hier:
text: str) - Der zweite Parameter ist ein
WorkflowContext[T_Out], wobeiT_Outder Datentyp ist, den dieser Knoten überctx.send_message()(hier:str) ausgeben wird.
- Der erste Parameter ist die eingegebene Eingabe für diesen Knoten (hier:
- Innerhalb eines Handlers berechnen Sie in der Regel ein Ergebnis und leiten es an nachgelagerte Knoten mithilfe von
ctx.send_message(result)weiter.
Schritt 3: Erstellen des zweiten Executors
Für einfache Schritte können Sie die Unterklassen überspringen und eine asynchrone Funktion mit demselben Signaturmuster (eingegebene Eingabe + WorkflowContext) definieren und mit @executorversehen. Dadurch wird ein voll funktionsfähiger Knoten erstellt, der in einen Fluss verkabelt werden kann:
@executor(id="reverse_text_executor")
async def reverse_text(text: str, ctx: WorkflowContext[Never, str]) -> None:
"""Reverse the input and yield the workflow output."""
result = text[::-1]
# Yield the final output for this workflow run
await ctx.yield_output(result)
Wichtige Punkte:
- Der
@executorDekorateur wandelt eine eigenständige asynchrone Funktion in einen Workflowknoten um. - Dies
WorkflowContextist mit zwei Typen parametrisiert:-
T_Out = Never: Dieser Knoten sendet keine Nachrichten an nachgeschaltete Knoten. -
T_W_Out = str: Dieser Knoten liefert die Workflowausgabe des Typs.str
-
- Terminalknoten liefern Ausgaben mithilfe von
ctx.yield_output(), um Workflow-Resultate bereitzustellen. - Der Workflow wird abgeschlossen, wenn er in den Leerlauf übergeht (keine Arbeit mehr zu erledigen)
Schritt 4: Erstellen des Workflows
Verbinden sie die Executoren mithilfe von WorkflowBuilder:
upper_case = UpperCase(id="upper_case_executor")
workflow = (
WorkflowBuilder()
.add_edge(upper_case, reverse_text)
.set_start_executor(upper_case)
.build()
)
Wichtige Punkte:
-
add_edge()erstellt gerichtete Verbindungen zwischen Executoren -
set_start_executor()definiert den Einstiegspunkt. -
build()finalisiert den Workflow.
Schritt 5: Ausführen des Workflows mit Streaming
Führen Sie den Workflow aus und beobachten Sie Ereignisse in Echtzeit:
async def main():
# Run the workflow and stream events
async for event in workflow.run_stream("hello world"):
print(f"Event: {event}")
if isinstance(event, WorkflowOutputEvent):
print(f"Workflow completed with result: {event.data}")
if __name__ == "__main__":
asyncio.run(main())
Schritt 6: Verständnis der Ausgabe
Wenn Sie den Workflow ausführen, werden Ereignisse wie folgt angezeigt:
Event: ExecutorInvokedEvent(executor_id=upper_case_executor)
Event: ExecutorCompletedEvent(executor_id=upper_case_executor)
Event: ExecutorInvokedEvent(executor_id=reverse_text_executor)
Event: ExecutorCompletedEvent(executor_id=reverse_text_executor)
Event: WorkflowOutputEvent(data='DLROW OLLEH', source_executor_id=reverse_text_executor)
Workflow completed with result: DLROW OLLEH
Wichtige Konzepte erläutert
Zwei Möglichkeiten zum Definieren von Executoren
-
Benutzerdefinierte Klasse (Unterklassenbildung
Executor): Am besten geeignet, wenn Sie Lebenszyklus-Hooks oder komplexen Zustand benötigen. ** Definieren Sie eine asynchrone Methode mit dem@handlerDecorator. -
Funktionsbasiert (
@executorDecorator): Am besten geeignet für einfache Schritte. Definieren Sie eine eigenständige asynchrone Funktion mit demselben Signaturmuster.
Beide Ansätze verwenden dieselbe Handlersignatur:
- Erster Parameter: die eingegebene Eingabe für diesen Knoten
- Zweiter Parameter: a
WorkflowContext[T_Out, T_W_Out]
Workflowkontexttypen
Der WorkflowContext generische Typ definiert, welche Datenflüsse zwischen Executoren fließen:
-
WorkflowContext[T_Out]- Wird für Knoten verwendet, die Nachrichten vom TypT_Outan nachgeschaltete Knoten überctx.send_message()senden -
WorkflowContext[T_Out, T_W_Out]- Wird für Knoten verwendet, die zusätzlich Workflow-Output vom TypT_W_Outüberctx.yield_output()liefern. -
WorkflowContextohne Typparameter entsprichtWorkflowContext[Never, Never], d. h., dieser Knoten sendet weder Nachrichten an nachgelagerte Knoten noch liefert Workflowausgabe.
Ereignistypen
Während der Streamingausführung beobachten Sie die folgenden Ereignistypen:
-
ExecutorInvokedEvent- Wenn ein Executor die Verarbeitung startet -
ExecutorCompletedEvent- Wenn ein Executor die Verarbeitung beendet -
WorkflowOutputEvent- Enthält das endgültige Workflowergebnis.
Python-Workflow-Generator-Muster
Dies WorkflowBuilder stellt eine Fluent-API zum Erstellen von Workflows bereit:
- add_edge(): Erstellt gerichtete Verbindungen zwischen Executoren
- set_start_executor(): Definiert den Workfloweinstiegspunkt.
- build(): Schließt ein unveränderliches Workflowobjekt ab und gibt es zurück.
Vollständiges Beispiel
Die vollständige, einsatzbereite Implementierung finden Sie im Beispiel im Agent Framework-Repository.
Dieses Beispiel umfasst:
- Vollständige Implementierung mit allen Importen und Dokumentationen
- Zusätzliche Kommentare zur Erläuterung der Workflowkonzepte
- Beispielausgabe mit den erwarteten Ergebnissen