Delen via


Opbouwfunctie voor werkstromen en uitvoering

Een werkstroom koppelt uitvoerders en randen aan elkaar in een gerichte grafiek en beheert de uitvoering. Het coördineert aanroep van uitvoerders, berichtroutering en gebeurtenisstreaming.

Werkstromen bouwen

Werkstromen worden samengesteld met behulp van de WorkflowBuilder klasse, die een fluent API biedt voor het definiëren van de werkstroomstructuur:

using Microsoft.Agents.AI.Workflows;

var processor = new DataProcessor();
var validator = new Validator();
var formatter = new Formatter();

// Build workflow
WorkflowBuilder builder = new(processor); // Set starting executor
builder.AddEdge(processor, validator);
builder.AddEdge(validator, formatter);
var workflow = builder.Build<string>(); // Specify input message type

Werkstromen worden samengesteld met behulp van de WorkflowBuilder klasse:

from agent_framework import WorkflowBuilder

processor = DataProcessor()
validator = Validator()
formatter = Formatter()

# Build workflow
builder = WorkflowBuilder(start_executor=processor)
builder.add_edge(processor, validator)
builder.add_edge(validator, formatter)
workflow = builder.build()

Werkstroomuitvoering

Werkstromen ondersteunen zowel streaming- als niet-streaminguitvoeringsmodi:

using Microsoft.Agents.AI.Workflows;

// Streaming execution — get events as they happen
StreamingRun run = await InProcessExecution.StreamAsync(workflow, inputMessage);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
    if (evt is ExecutorCompleteEvent executorComplete)
    {
        Console.WriteLine($"{executorComplete.ExecutorId}: {executorComplete.Data}");
    }

    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Workflow completed: {outputEvt.Data}");
    }
}

// Non-streaming execution — wait for completion
Run result = await InProcessExecution.RunAsync(workflow, inputMessage);
foreach (WorkflowEvent evt in result.NewEvents)
{
    if (evt is WorkflowOutputEvent outputEvt)
    {
        Console.WriteLine($"Final result: {outputEvt.Data}");
    }
}
# Streaming execution — get events as they happen
async for event in workflow.run_stream(input_message):
    if event.type == "output":
        print(f"Workflow completed: {event.data}")

# Non-streaming execution — wait for completion
events = await workflow.run(input_message)
print(f"Final result: {events.get_outputs()}")

Werkstroomvalidatie

Het framework voert uitgebreide validatie uit bij het bouwen van werkstromen:

  • Typecompatibiliteit: zorgt ervoor dat berichttypen compatibel zijn tussen verbonden uitvoerders
  • Graph Connectivity: controleert of alle uitvoerders bereikbaar zijn vanaf de startuitvoerer
  • Executorkoppeling: bevestigt dat alle executors correct zijn gekoppeld en geïnstantieerd
  • Edge-validatie: controleert op dubbele randen en ongeldige verbindingen

Uitvoeringsmodel: Supersteps

Het framework maakt gebruik van een gewijzigd Pregel-uitvoeringsmodel, een BSP-benadering (Bulk Synchronous Parallel) met verwerking op basis van superstep.

Hoe Supersteps werken

De uitvoering van de werkstroom is ingedeeld in discrete supersteps. Elke superstep:

  1. Verzamelt alle berichten die in behandeling zijn van de vorige superstap
  2. Hiermee worden berichten gerouteerd naar doelexecutors op basis van edge-definities
  3. Voert alle doelexecutors gelijktijdig uit binnen de superstep
  4. Wacht tot alle uitvoerders zijn voltooid voordat wordt doorgegaan (synchronisatiebarrière)
  5. Hiermee worden nieuwe berichten die door uitvoerders worden verzonden in de wachtrij geplaatst voor de volgende superstep
Superstep N:
┌─────────────────┐    ┌─────────────────┐    ┌─────────────────┐
│  Collect All    │───▶│  Route Messages │───▶│  Execute All    │
│  Pending        │    │  Based on Type  │    │  Target         │
│  Messages       │    │  & Conditions   │    │  Executors      │
└─────────────────┘    └─────────────────┘    └─────────────────┘
                                                       │
                                                       │ (barrier: wait for all)
┌─────────────────┐    ┌─────────────────┐             │
│  Start Next     │◀───│  Emit Events &  │◀────────────┘
│  Superstep      │    │  New Messages   │
└─────────────────┘    └─────────────────┘

Synchronisatiebarrière

Het belangrijkste kenmerk is de synchronisatiebarrière tussen supersteps. Binnen één superstap worden alle geactiveerde uitvoerders parallel uitgevoerd, maar de werkstroom gaat pas verder naar de volgende superstep als elke uitvoerder is voltooid.

Dit is van invloed op uitwaaipatronen: als u uitwaaiert naar meerdere paden ( één met een keten van uitvoerders en een andere met één langlopende uitvoerder), kan het gekoppelde pad pas verdergaan als de langlopende uitvoerder is voltooid.

Waarom Supersteps?

Het BSP-model biedt belangrijke garanties:

  • Deterministische uitvoering: Op basis van dezelfde invoer wordt de werkstroom altijd in dezelfde volgorde uitgevoerd
  • Betrouwbare controlepunten: Status kan worden opgeslagen bij superstepgrenzen voor fouttolerantie
  • Eenvoudiger redeneren: geen racevoorwaarden tussen supersteps; elk ziet een consistente weergave van berichten

Werken met het Superstep-model

Als u echt onafhankelijke parallelle paden nodig hebt die elkaar niet blokkeren, voegt u opeenvolgende stappen samen in één uitvoerprogramma. In plaats van de logica te koppelen step1 → step2 → step3, combineert u die logica in één uitvoerder. Beide parallelle paden worden vervolgens uitgevoerd binnen één superstep.

Volgende stappen

Verwante onderwerpen: