Kommentar
Åtkomst till den här sidan kräver auktorisering. Du kan prova att logga in eller ändra kataloger.
Åtkomst till den här sidan kräver auktorisering. Du kan prova att ändra kataloger.
Ett underarbetsflöde är ett fullständigt arbetsflöde som körs som en exekutor inom ett överordnat arbetsflöde. På så sätt kan du skapa komplexa system från mindre, återanvändbara arbetsflödesbyggstenar – var och en med sin egen isolerade körningskontext, tillståndshantering och meddelanderoutning.
Översikt
Underarbetsflöden är användbara när du vill:
- Dela upp komplexitet – dela upp ett stort arbetsflöde i mindre, oberoende testbara enheter.
- Återanvänd arbetsflödeslogik – bädda in samma underarbetsflöde i flera överordnade arbetsflöden.
- Isolerat tillstånd – håll varje underarbetsflödes interna tillstånd separat från det överordnade.
- Kontrollera dataflödet – meddelanden anger och lämnar underarbetsflödet endast via dess kanter, utan sändning över nivåer.
När ett underarbetsflöde läggs till i ett överordnat arbetsflöde fungerar det som alla andra köre: det tar emot indatameddelanden, kör sin interna graf till slutförande och genererar utdatameddelanden för underordnade utförare.
Skapa ett Delarbetsflöde
I C# skapar du underarbetsflöden på två sätt:
-
Direktbindning – används
BindAsExecutor()för att bädda in ett arbetsflöde direkt som köre i det överordnade arbetsflödet. Detta bevarar underarbetsflödets interna indata-/utdatatyper. -
Agentomslutning – använd
AsAIAgent()för att konvertera ett arbetsflöde till en agent och lägg sedan till agenten i det överordnade arbetsflödet. Detta är användbart när det överordnade arbetsflödet använder agentbaserade utförare.
Direktbindning med BindAsExecutor
Tilläggsmetoden BindAsExecutor() konverterar ett arbetsflöde till ett ExecutorBinding som kan läggas till direkt i ett överordnat arbetsflöde:
using Microsoft.Agents.AI.Workflows;
// Create executors for the inner workflow
UppercaseExecutor uppercase = new();
ReverseExecutor reverse = new();
AppendSuffixExecutor append = new(" [PROCESSED]");
// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(uppercase)
.AddEdge(uppercase, reverse)
.AddEdge(reverse, append)
.WithOutputFrom(append)
.Build();
// Bind the inner workflow as an executor
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("TextProcessingSubWorkflow");
// Build the parent workflow using the sub-workflow executor
PrefixExecutor prefix = new("INPUT: ");
PostProcessExecutor postProcess = new();
var parentWorkflow = new WorkflowBuilder(prefix)
.AddEdge(prefix, subWorkflowExecutor)
.AddEdge(subWorkflowExecutor, postProcess)
.WithOutputFrom(postProcess)
.Build();
Med BindAsExecutorbevaras underarbetsflödets typinmatnings- och utdatatyper – det överordnade arbetsflödet dirigerar meddelanden baserat på de faktiska typer som underarbetsflödet förväntar sig och producerar.
Agentinkapsling med AsAIAgent
När det överordnade arbetsflödet använder agentbaserade utförare konverterar du det inre arbetsflödet till en agent med .AsAIAgent()
WorkflowBuilder omsluter automatiskt agenten i en exekutor:
using Microsoft.Agents.AI;
using Microsoft.Agents.AI.Workflows;
// Create agents for the inner workflow
AIAgent specialist1 = chatClient.AsAIAgent("You are specialist 1. Analyze the data.");
AIAgent specialist2 = chatClient.AsAIAgent("You are specialist 2. Validate the analysis.");
// Build the inner workflow
var innerWorkflow = new WorkflowBuilder(specialist1)
.AddEdge(specialist1, specialist2)
.Build();
// Convert the inner workflow to an agent
AIAgent innerWorkflowAgent = innerWorkflow.AsAIAgent(
id: "analysis-pipeline",
name: "Analysis Pipeline",
description: "A sub-workflow that analyzes and validates data"
);
// Create agents for the parent workflow
AIAgent coordinator = chatClient.AsAIAgent("You are a coordinator. Delegate tasks to the team.");
AIAgent reviewer = chatClient.AsAIAgent("You are a reviewer. Review the final output.");
// Build the parent workflow with the sub-workflow
var parentWorkflow = new WorkflowBuilder(coordinator)
.AddEdge(coordinator, innerWorkflowAgent)
.AddEdge(innerWorkflowAgent, reviewer)
.Build();
Det inre arbetsflödet körs som ett enda steg från det överordnade arbetsflödets perspektiv. Koordinatorn skickar meddelanden till analyspipelinen, som internt kör specialist1 → specialist2och vidarebefordrar sedan resultatet till granskaren.
Tips/Råd
Använd BindAsExecutor() när du arbetar med inskrivna utförare och AsAIAgent() när du arbetar med agentbaserade arbetsflöden. Mer information om hur du konfigurerar konverteringen från arbetsflöde till agent finns i Arbetsflöden som agenter.
Indata- och utdatatyper
När ett arbetsflöde används som ett underarbetsflöde bevaras typkontrakten för dess interna utförare.
Med BindAsExecutoraccepterar underarbetsflödeskörningen samma indatatyper som det inre arbetsflödets startexekutor och skickar samma utdatatyper som det inre arbetsflödet skapar. Det överordnade arbetsflödets kanter måste ansluta utförare vars utdatatyper matchar underarbetsflödets förväntade indatatyper, och underarbetsflödets utdatatyper måste matcha de förväntade indata som förväntas av underordnade utförare.
Med AsAIAgent omsluts underarbetsflödet som en agent och följer Agent Executor-indata-/utdatakontrakt (string, ChatMessage, IEnumerable<ChatMessage>).
Utdatabeteende
Som standard vidarebefordras dessa utdata som meddelanden till anslutna utförare i det överordnade arbetsflödet när ett underarbetsflöde skapar utdata (via YieldOutputAsync). Detta gör det möjligt för underordnade utförare att bearbeta underarbetsflödesresultat.
Klassen ExecutorOptions styr det här beteendet:
| Option | Standardinställning | Beskrivning |
|---|---|---|
AutoSendMessageHandlerResultObject |
true |
Vidarebefordra utdata från underarbetsflödet som meddelanden till anslutna utförare i det överordnade diagrammet. |
AutoYieldOutputHandlerResultObject |
false |
Ge utdata från underarbetsflödet direkt till det överordnade arbetsflödets utdatahändelseström. |
När AutoYieldOutputHandlerResultObject är aktiverat kringgår utdata från underarbetsflödet den överordnades interna routning och levereras direkt till anroparen för det överordnade arbetsflödet.
var options = new ExecutorOptions
{
AutoYieldOutputHandlerResultObject = true,
};
ExecutorBinding subWorkflowExecutor = innerWorkflow.BindAsExecutor("SubWorkflow", options);
Begäranden och svar
Underarbetsflöden har fullt stöd för mekanismen för begäran och svar . När en exekverare i underarbetsflödet skickar en begäran (till exempel för att begära mänsklig indata) WorkflowHostExecutor vidarebefordras RequestInfoEvent till det överordnade arbetsflödet med ett kvalificerat port-ID – delarbetsflödeskörningens ID prefixas med port-ID:t (till exempel SubWorkflow.GuessNumber).
Den här kvalificeringen säkerställer att när det överordnade arbetsflödet får ett svar kan svaret dirigeras tillbaka till rätt instans av underarbetsflödet. Det överordnade arbetsflödet hanterar begäranden under arbetsflödet med samma svarsmekanism som andra begäranden:
await using StreamingRun handle = await InProcessExecution.RunStreamingAsync(parentWorkflow, input);
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
switch (evt)
{
case RequestInfoEvent requestInfoEvt:
// The request may originate from the sub-workflow
// Handle it and send the response back
var response = requestInfoEvt.Request.CreateResponse(myResponseData);
await handle.SendResponseAsync(response);
break;
case WorkflowOutputEvent outputEvt:
Console.WriteLine($"Output: {outputEvt.Data}");
break;
}
}
Anmärkning
Från den överordnade arbetsflödesuppringarens perspektiv finns det ingen skillnad mellan en begäran från en utförare på den översta nivån och en begäran från ett underarbetsflöde. Ramverket hanterar routningen transparent.
Så här fungerar det
När det överordnade arbetsflödet dirigerar ett meddelande till exekveraren av underarbetsflödet:
-
Indataleverans – meddelandet vidarebefordras till det inre arbetsflödets startexekutor. Med
BindAsExecutormåste meddelandetypen matcha de förväntade typerna för startutföraren. MedAsAIAgentnormaliseras meddelanden tillChatMessageformat. - Inre körning – det inre arbetsflödet har en egen supersteploop.
-
Utdatasamling – det inre arbetsflödets utdatahändelser samlas in. Med
BindAsExecutorbehåller utdata sina ursprungliga typer. MedAsAIAgentkonverteras utdata till agentsvarsmeddelanden. - Vidarebefordran av begäranden – om det inre arbetsflödet har väntande begäranden vidarebefordras de till det överordnade arbetsflödet för hantering (se Begäranden och svar).
- Distribution nedströms – de resulterande meddelandena skickas till nästa exekutor i det överordnade arbetsflödet.
Eftersom det inre arbetsflödet har en egen körningskontext är dess tillstånd oberoende av det överordnade arbetsflödet.
Tips/Råd
Mer information om hur du konfigurerar konverteringen från arbetsflöde till agent, inklusive strömningsbeteende och undantagshantering, finns i Arbetsflöden som agenter.
Kapsling på flera nivåer
Underarbetsflöden kan kapslas till godtyckligt djup. Varje nivå behåller sin egen exekveringskontext:
// Level 1: Data preparation pipeline
var dataPipeline = new WorkflowBuilder(fetcher)
.AddEdge(fetcher, cleaner)
.Build();
AIAgent dataPipelineAgent = dataPipeline.AsAIAgent(
id: "data-pipeline",
name: "Data Pipeline"
);
// Level 2: Analysis pipeline (contains the data pipeline)
var analysisPipeline = new WorkflowBuilder(dataPipelineAgent)
.AddEdge(dataPipelineAgent, analyzer)
.Build();
AIAgent analysisPipelineAgent = analysisPipeline.AsAIAgent(
id: "analysis-pipeline",
name: "Analysis Pipeline"
);
// Level 3: Top-level orchestration
var topWorkflow = new WorkflowBuilder(coordinator)
.AddEdge(coordinator, analysisPipelineAgent)
.AddEdge(analysisPipelineAgent, reporter)
.Build();
Anmärkning
Varje kapslingsnivå lägger till exekveringsbelastning eftersom det inre arbetsflödet kör en egen superstegs-slinga. Håll kapslingsdjupet rimligt för prestandakänsliga scenarier.
Felhantering
När ett underarbetsflöde misslyckas sprids felet till det överordnade arbetsflödet som en SubworkflowErrorEvent. Det överordnade arbetsflödet kan observera dessa fel via händelseströmmen:
await foreach (WorkflowEvent evt in handle.WatchStreamAsync())
{
if (evt is SubworkflowErrorEvent subError)
{
Console.WriteLine($"Sub-workflow '{subError.ExecutorId}' failed: {subError.Data}");
}
}
Om underarbetsflödet stöter på ett ohanterat undantag fortsätter körningen av det överordnade arbetsflödet, men underarbetsflödeskörningen slutar bearbeta ytterligare meddelanden.
Kontrollpunkter
När en kontrollpunkt tas i föräldraarbetsflödet, serialiseras underarbetsflödesagentens sessionstillstånd som en del av föräldraexekverarens kontrollpunktsdata. Vid återställningen deserialiseras sessionstillståndet, vilket gör att det överordnade arbetsflödet kan återupptas med underarbetsflödets tillstånd intakt.
CheckpointManager checkpointManager = CheckpointManager.CreateInMemory();
// Run the parent workflow with checkpointing
StreamingRun run = await InProcessExecution
.RunStreamingAsync(parentWorkflow, input, checkpointManager);
await foreach (WorkflowEvent evt in run.WatchStreamAsync())
{
// Process events, including those from sub-workflows
}
// Resume from a checkpoint
CheckpointInfo checkpoint = run.Checkpoints[^1];
StreamingRun resumedRun = await InProcessExecution
.ResumeStreamingAsync(parentWorkflow, checkpoint, checkpointManager);
Skapa ett Delarbetsflöde
I Python skapar du ett underarbetsflöde genom att omsluta ett Workflow i ett WorkflowExecutor och lägga till det i ett överordnat arbetsflöde.
from agent_framework import WorkflowBuilder, WorkflowExecutor
# Create agents for the inner workflow
specialist1 = client.as_agent(name="Specialist1", instructions="Analyze the data.")
specialist2 = client.as_agent(name="Specialist2", instructions="Validate the analysis.")
# Build the inner workflow
inner_workflow = (
WorkflowBuilder(start_executor=specialist1)
.add_edge(specialist1, specialist2)
.build()
)
# Wrap as an executor
inner_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
)
# Create agents for the parent workflow
coordinator = client.as_agent(name="Coordinator", instructions="Delegate tasks to the team.")
reviewer = client.as_agent(name="Reviewer", instructions="Review the final output.")
# Build the parent workflow with the sub-workflow
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
Det inre arbetsflödet körs som ett enda steg från det överordnade arbetsflödets perspektiv. Koordinatorn skickar meddelanden till analyspipelinen, som internt kör specialist1 → specialist2och vidarebefordrar sedan resultatet till granskaren.
WorkflowExecutor-Parametrar
| Parameter | Type | Standardinställning | Beskrivning |
|---|---|---|---|
workflow |
Workflow |
— | Arbetsflödesinstansen som ska omslutas som en köre. |
id |
str |
— | Unikt identifierare för den här köraren. |
allow_direct_output |
bool |
False |
När Truereturneras utdata från underarbetsflödet direkt till det överordnade arbetsflödets händelseström i stället för att skickas som meddelanden till anslutna utförare. |
propagate_request |
bool |
False |
När Truesprids begäranden från underarbetsflödet till det överordnade arbetsflödets händelseström som vanliga informationshändelser för begäranden. När False slutförs, omsluts begäranden i SubWorkflowRequestMessage för avlyssning av överordnade utförare. |
Implicit eller explicit omslutning
Kan WorkflowBuilder automatiskt omsluta Workflow instanser i en WorkflowExecutor när du skickar dem direkt. Detta liknar hur Agent instanser automatiskt omsluts i AgentExecutor.
# Implicit wrapping — WorkflowBuilder detects the Workflow and wraps it
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow) # Workflow auto-wrapped
.add_edge(inner_workflow, reviewer)
.build()
)
Använd explicit omslutning när du behöver:
- Tilldela ett specifikt exekutor-ID att användas som referens i flera kanter.
- Återanvänd samma
WorkflowExecutorinstans i diagrammet.
# Explicit wrapping — create the WorkflowExecutor yourself
inner_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
)
parent_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
Indata- och utdatatyper
Ärver WorkflowExecutor sin typsignatur från det omslutna arbetsflödet:
-
Indatatyper matchar det omslutna arbetsflödets startexekutorindatatyper (plus
SubWorkflowResponseMessageför hantering av svar på vidarebefordrade begäranden). -
Utdatatyperna matchar utdatatyperna för det omslutna arbetsflödet. Om någon exekverare i underarbetsflödet är begär-svar-kompatibel,
SubWorkflowRequestMessageinkluderas även som en utdatatyp.
Det innebär att det överordnade arbetsflödets kanter måste ansluta utförare vars utdatatyper matchar underarbetsflödets förväntade indatatyper. På samma sätt måste underordnade utförare acceptera de typer som underarbetsflödet skapar:
# The sub-workflow's start executor accepts TextProcessingRequest
# So the parent executor must send TextProcessingRequest
class Orchestrator(Executor):
@handler
async def start(self, texts: list[str], ctx: WorkflowContext[TextProcessingRequest]) -> None:
for text in texts:
await ctx.send_message(TextProcessingRequest(text=text))
# The sub-workflow yields TextProcessingResult
# So the downstream executor must handle TextProcessingResult
class ResultCollector(Executor):
@handler
async def collect(self, result: TextProcessingResult, ctx: WorkflowContext) -> None:
print(f"Received: {result}")
Utdatabeteende
Som standard (allow_direct_output=False) vidarebefordras dessa utdata som meddelanden till anslutna utförare i det överordnade arbetsflödet med hjälp av yield_outputnär ett underarbetsflöde skapar utdata via send_message. Detta gör det möjligt för underordnade utförare att bearbeta delarbetsflödesresultat som en del av det överordnade diagrammet.
När allow_direct_output=Truereturneras utdata från underarbetsflödet direkt till det överordnade arbetsflödets händelseström. Utdata från underarbetsflödet blir utdata från det överordnade arbetsflödet och kringgår den överordnades interna körningsroutning:
# Outputs go directly to parent's event stream
sub_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
allow_direct_output=True,
)
# The caller receives sub-workflow outputs directly
async for event in parent_workflow.run(input_data, stream=True):
if event.type == "output":
# This output came from the sub-workflow
print(event.data)
Begäranden och svar
Underarbetsflöden har fullt stöd för mekanismen för begäran och svar . När en verkställare inom ett underarbetsflöde anropar ctx.request_info() så fångar WorkflowExecutor begäran och hanterar den baserat på inställningen propagate_request.
Avlyssna Begäranden i det Överordnade Arbetsflödet (Standard)
Med propagate_request=False (standardinställningen) omsluts begäranden från underarbetsflödet i en SubWorkflowRequestMessage och skickas till anslutna utförare i det överordnade arbetsflödet. På så sätt kan överordnade utförare hantera begäran lokalt:
from agent_framework import (
SubWorkflowRequestMessage,
SubWorkflowResponseMessage,
)
class ParentHandler(Executor):
@handler
async def handle_request(
self,
request: SubWorkflowRequestMessage,
ctx: WorkflowContext[SubWorkflowResponseMessage],
) -> None:
# Inspect the original request from the sub-workflow
original_data = request.source_event.data
# Create and send a response back to the sub-workflow
response = request.create_response(my_response_data)
await ctx.send_message(response, target_id=request.executor_id)
Metoden create_response() verifierar att svarsdatatypen matchar den förväntade typen från den ursprungliga begäran. Om typerna inte matchar genereras en TypeError .
Viktigt!
När du skickar tillbaka svaret använder du target_id=request.executor_id för att dirigera SubWorkflowResponseMessage till rätt WorkflowExecutor instans.
Sprida begäranden till externa anropare
Med propagate_request=Truesprids begäranden från underarbetsflödet till det överordnade arbetsflödets händelseström med hjälp av standardmekanismen request_info . Det överordnade arbetsflödets anropare hanterar dessa begäranden på samma sätt som andra mänskliga begäranden i loopen:
sub_workflow_executor = WorkflowExecutor(
workflow=inner_workflow,
id="analysis-pipeline",
propagate_request=True,
)
# Run the parent workflow and handle propagated requests
result = await parent_workflow.run(input_data)
request_info_events = result.get_request_info_events()
if request_info_events:
responses = {}
for event in request_info_events:
# Handle each request (e.g., ask a human)
responses[event.request_id] = get_human_response(event.data)
result = await parent_workflow.run(responses=responses)
Så här fungerar det
När det överordnade arbetsflödet dirigerar ett meddelande till WorkflowExecutor:
- Indataleverans – meddelandet vidarebefordras till det inre arbetsflödets startexekutor. Meddelandetypen måste matcha startkörarens förväntade indatatyper.
- Inre körning – det inre arbetsflödet kör en egen superstegsloop till fullbordan eller tills den behöver externa indata.
-
Utdatasamling – det inre arbetsflödets utdatahändelser samlas in och vidarebefordras baserat på inställningen
allow_direct_output. -
Vidarebefordran av begäranden – om det inre arbetsflödet har väntande begäranden vidarebefordras de baserat på
propagate_requestinställningen (se Begäranden och svar). -
Svarackumulering — den
WorkflowExecutorsamlar in svar och återupptar underarbetsflödet först när alla förväntade svar för en viss körning har tagits emot. - Nedströmssändning – utdata skickas till nästa köre i det överordnade arbetsflödet.
Underarbetsflödet upprätthåller sitt eget interna tillstånd oberoende av det överordnade. Meddelanden dirigeras endast genom kanterna som ansluter WorkflowExecutor till resten av det överordnade diagrammet – det finns inget meddelande som sänder över kapslingsnivåer.
Kapsling på flera nivåer
Underarbetsflöden kan kapslas till godtyckligt djup. Varje nivå behåller sin egen exekveringskontext:
# Level 1: Data preparation pipeline
data_pipeline = (
WorkflowBuilder(start_executor=fetcher)
.add_edge(fetcher, cleaner)
.build()
)
# Level 2: Analysis pipeline (contains the data pipeline)
analysis_pipeline = (
WorkflowBuilder(start_executor=data_pipeline) # Implicit wrapping
.add_edge(data_pipeline, analyzer)
.build()
)
# Level 3: Top-level orchestration
top_workflow = (
WorkflowBuilder(start_executor=coordinator)
.add_edge(coordinator, analysis_pipeline) # Implicit wrapping
.add_edge(analysis_pipeline, reporter)
.build()
)
Anmärkning
Varje kapslingsnivå lägger till exekveringsbelastning eftersom det inre arbetsflödet kör en egen superstegs-slinga. Håll kapslingsdjupet rimligt för prestandakänsliga scenarier.
Varning
Alla samtidiga körningar av en WorkflowExecutor delar samma underliggande arbetsflödesinstans. Utförare i underarbetsflödet bör vara tillståndslösa för att undvika störning mellan samtida körningar.
Felhantering
När ett underarbetsflöde misslyckas sprids felet till det överordnade arbetsflödet. Samlar WorkflowExecutor in den misslyckade händelsen från underarbetsflödet och konverterar den till en felhändelse i den överordnade kontexten:
async for event in parent_workflow.run(input_data, stream=True):
if event.type == "failed":
print(f"Sub-workflow failed: {event.details.message}")
elif event.type == "output":
print(event.data)
Om underarbetsflödet stöter på ett ohanterat undantag får det överordnade arbetsflödet en felhändelse med undantagsinformationen, inklusive underarbetsflödets ID.
Kontrollpunkter
Underarbetsflöden stöder kontrollpunkter. När en kontrollpunkt tas i det överordnade arbetsflödet, serialiserar WorkflowExecutor sitt interna tillstånd, vilket omfattar det interna arbetsflödets körningsförlopp och eventuella cachelagrade meddelanden. Vid återställning är det här tillståndet deserialiserat, vilket gör att det överordnade arbetsflödet kan återupptas med underarbetsflödet intakt.
from agent_framework import FileCheckpointStorage, WorkflowBuilder
checkpoint_storage = FileCheckpointStorage(storage_path="./checkpoints")
# Build the parent workflow with checkpointing
parent_workflow = (
WorkflowBuilder(
start_executor=coordinator,
checkpoint_storage=checkpoint_storage,
)
.add_edge(coordinator, inner_workflow_executor)
.add_edge(inner_workflow_executor, reviewer)
.build()
)
# Run with automatic checkpointing
async for event in parent_workflow.run("Analyze the dataset", stream=True):
if event.type == "output":
print(event.data)
# Resume from a checkpoint
checkpoints = await checkpoint_storage.list_checkpoints()
async for event in parent_workflow.run(
checkpoint_id=checkpoints[-1].checkpoint_id,
checkpoint_storage=checkpoint_storage,
stream=True,
):
if event.type == "output":
print(event.data)