Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Een werkstroom koppelt uitvoerders en randen aan elkaar in een gerichte grafiek en beheert de uitvoering. Het coördineert aanroep van uitvoerders, berichtroutering en gebeurtenisstreaming.
Werkstromen bouwen
Werkstromen worden samengesteld met behulp van de WorkflowBuilder klasse, die een fluent API biedt voor het definiëren van de werkstroomstructuur:
using Microsoft.Agents.AI.Workflows;
var processor = new DataProcessor();
var validator = new Validator();
var formatter = new Formatter();
// Build workflow
WorkflowBuilder builder = new(processor); // Set starting executor
builder.AddEdge(processor, validator);
builder.AddEdge(validator, formatter);
var workflow = builder.Build<string>(); // Specify input message type
Werkstromen worden samengesteld met behulp van de WorkflowBuilder klasse:
from agent_framework import WorkflowBuilder
processor = DataProcessor()
validator = Validator()
formatter = Formatter()
# Build workflow
builder = WorkflowBuilder(start_executor=processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()
Werkstroomuitvoering
Werkstromen ondersteunen zowel streaming- als niet-streaminguitvoeringsmodi:
using Microsoft.Agents.AI.Workflows;
// Streaming execution — get events as they happen
StreamingRun run = await InProcessExecution.StreamAsync(workflow, inputMessage);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
if (evt is ExecutorCompleteEvent executorComplete)
{
Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
}
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Workflow completed: {outputEvt.Data}");
}
}
// Non-streaming execution — wait for completion
Run result = await InProcessExecution.RunAsync(workflow, inputMessage);
foreach (WorkflowEvent evt in result.NewEvents)
{
if (evt is WorkflowOutputEvent outputEvt)
{
Console.WriteLine($"Final result: {outputEvt.Data}");
}
}
# Streaming execution — get events as they happen
async for event in workflow.run_stream(input_message):
if event.type == "output":
print(f"Workflow completed: {event.data}")
# Non-streaming execution — wait for completion
events = await workflow.run(input_message)
print(f"Final result: {events.get_outputs()}")
Werkstroomvalidatie
Het framework voert uitgebreide validatie uit bij het bouwen van werkstromen:
- Typecompatibiliteit: zorgt ervoor dat berichttypen compatibel zijn tussen verbonden uitvoerders
- Graph Connectivity: controleert of alle uitvoerders bereikbaar zijn vanaf de startuitvoerer
- Executorkoppeling: bevestigt dat alle executors correct zijn gekoppeld en geïnstantieerd
- Edge-validatie: controleert op dubbele randen en ongeldige verbindingen
Uitvoeringsmodel: Supersteps
Het framework maakt gebruik van een gewijzigd Pregel-uitvoeringsmodel, een BSP-benadering (Bulk Synchronous Parallel) met verwerking op basis van superstep.
Hoe Supersteps werken
De uitvoering van de werkstroom is ingedeeld in discrete supersteps. Elke superstep:
- Verzamelt alle berichten die in behandeling zijn van de vorige superstap
- Hiermee worden berichten gerouteerd naar doelexecutors op basis van edge-definities
- Voert alle doelexecutors gelijktijdig uit binnen de superstep
- Wacht tot alle uitvoerders zijn voltooid voordat wordt doorgegaan (synchronisatiebarrière)
- Hiermee worden nieuwe berichten die door uitvoerders worden verzonden in de wachtrij geplaatst voor de volgende superstep
Superstep N:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ Collect All │───▶│ Route Messages │───▶│ Execute All │
│ Pending │ │ Based on Type │ │ Target │
│ Messages │ │ & Conditions │ │ Executors │
└─────────────────┘ └─────────────────┘ └─────────────────┘
│
│ (barrier: wait for all)
┌─────────────────┐ ┌─────────────────┐ │
│ Start Next │◀───│ Emit Events & │◀────────────┘
│ Superstep │ │ New Messages │
└─────────────────┘ └─────────────────┘
Synchronisatiebarrière
Het belangrijkste kenmerk is de synchronisatiebarrière tussen supersteps. Binnen één superstap worden alle geactiveerde uitvoerders parallel uitgevoerd, maar de werkstroom gaat pas verder naar de volgende superstep als elke uitvoerder is voltooid.
Dit is van invloed op uitwaaipatronen: als u uitwaaiert naar meerdere paden ( één met een keten van uitvoerders en een andere met één langlopende uitvoerder), kan het gekoppelde pad pas verdergaan als de langlopende uitvoerder is voltooid.
Waarom Supersteps?
Het BSP-model biedt belangrijke garanties:
- Deterministische uitvoering: Op basis van dezelfde invoer wordt de werkstroom altijd in dezelfde volgorde uitgevoerd
- Betrouwbare controlepunten: Status kan worden opgeslagen bij superstepgrenzen voor fouttolerantie
- Eenvoudiger redeneren: geen racevoorwaarden tussen supersteps; elk ziet een consistente weergave van berichten
Werken met het Superstep-model
Als u echt onafhankelijke parallelle paden nodig hebt die elkaar niet blokkeren, voegt u opeenvolgende stappen samen in één uitvoerprogramma. In plaats van de logica te koppelen step1 → step2 → step3, combineert u die logica in één uitvoerder. Beide parallelle paden worden vervolgens uitgevoerd binnen één superstep.
Volgende stappen
Verwante onderwerpen:
- Uitvoerders : verwerkingseenheden in een werkstroom
- Randen : verbindingen tussen uitvoerders
- Gebeurtenissen : waarneembaarheid van werkstromen
- Statusbeheer