Partager via


Concepts fondamentaux des flux de travail Microsoft Agent Framework - Edges

Ce document fournit un aperçu détaillé du composant Edges du système de flux de travail Microsoft Agent Framework.

Aperçu

Les arêtes définissent la façon dont les messages circulent entre les exécuteurs avec des conditions facultatives. Ils représentent les connexions dans le graphique de flux de travail et déterminent les chemins de flux de données.

Types d'arêtes

Le cadre prend en charge plusieurs modèles pour l'edge computing :

  1. Arêtes directes : connexions simples un-à-un entre les exécuteurs
  2. Arêtes conditionnelles : arêtes avec des conditions qui déterminent quand les messages doivent circuler
  3. Arêtes à éventail : Un exécuteur envoyant des messages à plusieurs cibles
  4. Edges du ventilateur : plusieurs exécuteurs envoyant des messages à une seule cible

Arêtes directes

La forme la plus simple de connexion entre deux exécuteurs :

using Microsoft.Agents.AI.Workflows;

WorkflowBuilder builder = new(sourceExecutor);
builder.AddEdge(sourceExecutor, targetExecutor);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(source_executor, target_executor)
builder.set_start_executor(source_executor)
workflow = builder.build()

Arêtes conditionnelles

Arêtes qui s’activent uniquement lorsque certaines conditions sont remplies :

// Route based on message content
builder.AddEdge(
    source: spamDetector,
    target: emailProcessor,
    condition: result => result is SpamResult spam && !spam.IsSpam
);

builder.AddEdge(
    source: spamDetector,
    target: spamHandler,
    condition: result => result is SpamResult spam && spam.IsSpam
);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.add_edge(spam_detector, email_processor, condition=lambda result: isinstance(result, SpamResult) and not result.is_spam)
builder.add_edge(spam_detector, spam_handler, condition=lambda result: isinstance(result, SpamResult) and result.is_spam)
builder.set_start_executor(spam_detector)
workflow = builder.build()

Arêtes de cas de commutateur

Acheminer les messages vers différents exécuteurs en fonction des conditions :

builder.AddSwitch(routerExecutor, switchBuilder =>
    switchBuilder
        .AddCase(
            message => message.Priority < Priority.Normal,
            executorA
        )
        .AddCase(
            message => message.Priority < Priority.High,
            executorB
        )
        .SetDefault(executorC)
);
from agent_framework import (
    Case,
    Default,
    WorkflowBuilder,
)

builder = WorkflowBuilder()
builder.set_start_executor(router_executor)
builder.add_switch_case_edge_group(
    router_executor,
    [
        Case(
            condition=lambda message: message.priority < Priority.NORMAL,
            target=executor_a,
        ),
        Case(
            condition=lambda message: message.priority < Priority.HIGH,
            target=executor_b,
        ),
        Default(target=executor_c)
    ],
)
workflow = builder.build()

Arêtes de ventilateur

Distribuez des messages d’un exécuteur à plusieurs cibles :

// Send to all targets
builder.AddFanOutEdge(splitterExecutor, targets: [worker1, worker2, worker3]);

// Send to specific targets based on target selector function
builder.AddFanOutEdge(
    source: routerExecutor,
    targetSelector: (message, targetCount) => message.Priority switch
    {
        Priority.High => [0], // Route to first worker only
        Priority.Normal => [1, 2], // Route to workers 2 and 3
        _ => Enumerable.Range(0, targetCount) // Route to all workers
    },
    targets: [highPriorityWorker, normalWorker1, normalWorker2]
);
from agent_framework import WorkflowBuilder

builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(splitter_executor, [worker1, worker2, worker3])
workflow = builder.build()

# Send to specific targets based on partitioner function
builder = WorkflowBuilder()
builder.set_start_executor(splitter_executor)
builder.add_fan_out_edges(
    splitter_executor,
    [worker1, worker2, worker3],
    selection_func=lambda message, target_ids: (
        [0] if message.priority == Priority.HIGH else
        [1, 2] if message.priority == Priority.NORMAL else
        list(range(target_count))
    )
)
workflow = builder.build()

Edges du ventilateur

Collectez des messages à partir de plusieurs sources dans une seule cible :

// Aggregate results from multiple workers
builder.AddFanInEdge(aggregatorExecutor, sources: [worker1, worker2, worker3]);
builder.add_fan_in_edge([worker1, worker2, worker3], aggregator_executor)

Étape suivante