Notitie
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen u aan te melden of de directory te wijzigen.
Voor toegang tot deze pagina is autorisatie vereist. U kunt proberen de mappen te wijzigen.
Dit document bevat een overzicht van de status in het Werkstroomsysteem van Microsoft Agent Framework.
Overzicht
Met de status kunnen meerdere uitvoerders in een werkstroom algemene gegevens openen en wijzigen. Deze functie is essentieel voor scenario's waarbij verschillende onderdelen van de werkstroom informatie moeten delen waarbij direct message passing niet haalbaar of efficiënt is.
Schrijven naar status
using Microsoft.Agents.AI.Workflows;
internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
/// <summary>
/// Reads a file and stores its content in a shared state.
/// </summary>
/// <param name="message">The path to the embedded resource file.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The ID of the shared state where the file content is stored.</returns>
[MessageHandler]
private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
{
// Read file content from embedded resource
string fileContent = File.ReadAllText(message);
// Store file content in a shared state for access by other executors
string fileID = Guid.NewGuid().ToString();
await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");
return fileID;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class FileReadExecutor(Executor):
@handler
async def handle(self, file_path: str, ctx: WorkflowContext[str]):
# Read file content from embedded resource
with open(file_path, 'r') as file:
file_content = file.read()
# Store file content in state for access by other executors
file_id = str(uuid.uuid4())
ctx.set_state(file_id, file_content)
await ctx.send_message(file_id)
Toegangsstatus
using Microsoft.Agents.AI.Workflows;
internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
/// <summary>
/// Counts the number of words in the file content stored in a shared state.
/// </summary>
/// <param name="message">The ID of the shared state containing the file content.</param>
/// <param name="context">The workflow context for accessing shared states.</param>
/// <returns>The number of words in the file content.</returns>
[MessageHandler]
private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
{
// Retrieve the file content from the shared state
var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
?? throw new InvalidOperationException("File content state not found");
return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
}
}
from agent_framework import (
Executor,
WorkflowContext,
handler,
)
class WordCountingExecutor(Executor):
@handler
async def handle(self, file_id: str, ctx: WorkflowContext[int]):
# Retrieve the file content from state
file_content = ctx.get_state(file_id)
if file_content is None:
raise ValueError("File content state not found")
await ctx.send_message(len(file_content.split()))
Statusisolatie
In echte toepassingen is het goed beheren van de status essentieel bij het verwerken van meerdere taken of aanvragen. Zonder de juiste isolatie kan de gedeelde status tussen verschillende werkstroomuitvoeringen leiden tot onverwacht gedrag, beschadiging van gegevens en racevoorwaarden. In deze sectie wordt uitgelegd hoe u statusisolatie in Microsoft Agent Framework-werkstromen kunt garanderen, met inzicht in best practices en veelvoorkomende valkuilen.
Veranderlijke werkstroombouwers versus onveranderbare werkstromen
Werkstromen worden gemaakt door werkstroombouwers. Werkstroombouwers worden over het algemeen als veranderlijk beschouwd, waarbij men de startuitvoering of andere configuraties kan toevoegen of wijzigen nadat de bouwer is gemaakt of zelfs nadat de werkstroom is opgebouwd. Aan de andere kant zijn werkstromen onveranderbaar in dat wanneer een werkstroom is gebouwd, deze niet kan worden gewijzigd (geen openbare API om een werkstroom te wijzigen).
Dit onderscheid is belangrijk omdat dit van invloed is op de manier waarop de status wordt beheerd in verschillende werkstroomuitvoeringen. Het wordt niet aanbevolen om één werkstroomexemplaar opnieuw te gebruiken voor meerdere opdrachten of verzoeken, omdat dit kan leiden tot onbedoeld delen van de toestand. In plaats daarvan wordt u aangeraden een nieuw werkstroomexemplaar te maken van het builder-object voor elke taak of verzoek om de juiste statusisolatie en thread-veiligheid te garanderen.
Statusisolatie garanderen met Helper-methoden
Wanneer executor-exemplaren eenmaal worden gemaakt en gedeeld in meerdere werkstroomconstructies, wordt de interne status gedeeld door alle werkstroomuitvoeringen. Dit kan leiden tot problemen als een uitvoerder een mutabele staat bevat die per werkstroom moet worden geïsoleerd. Om de juiste statusisolatie en threadveiligheid te garanderen, verpakt u het instantiërings- en werkstroomgebouw binnen een helpermethode, zodat elke aanroep nieuwe, onafhankelijke exemplaren produceert.
Binnenkort beschikbaar...
Niet-geïsoleerd voorbeeld (gedeelde status):
executor_a = CustomExecutorA()
executor_b = CustomExecutorB()
# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)
workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state
Geïsoleerd voorbeeld (helpermethode):
def create_workflow() -> Workflow:
"""Create a fresh workflow with isolated state.
Each call produces independent executor instances, ensuring no state
leaks between workflow runs.
"""
executor_a = CustomExecutorA()
executor_b = CustomExecutorB()
return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()
# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()
Aanbeveling
Om de juiste statusisolatie en threadveiligheid te garanderen, moet u er ook voor zorgen dat uitvoerders die in de helpermethode zijn gemaakt, geen externe onveranderbare status delen.
Statusbeheer van agent
Agentcontext wordt beheerd via agentthreads. Standaard krijgt elke agent in een werkstroom een eigen thread, tenzij de agent wordt beheerd door een aangepaste uitvoerder. Raadpleeg Werken met agents voor meer informatie.
Agentthreads worden behouden over werkstroomruns. Dit betekent dat als een agent wordt aangeroepen in de eerste uitvoering van een werkstroom, inhoud die door de agent wordt gegenereerd, beschikbaar is in volgende uitvoeringen van hetzelfde werkstroomexemplaren. Hoewel dit handig kan zijn voor het onderhouden van continuïteit binnen één taak, kan dit ook leiden tot onbedoeld delen van statussen als hetzelfde werkstroomexemplaren opnieuw worden gebruikt voor verschillende taken of aanvragen. Om ervoor te zorgen dat elke taak een geïsoleerde agentstatus heeft, verpakt u het maken van agents en werkstromen in een helpermethode, zodat elke aanroep nieuwe agentexemplaren produceert met hun eigen threads.
Binnenkort beschikbaar...
Niet-geïsoleerd voorbeeld (status van gedeelde agent):
writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
Geïsoleerd voorbeeld (helpermethode):
def create_workflow() -> Workflow:
"""Create a fresh workflow with isolated agent state.
Each call produces new agent instances with their own threads,
ensuring no conversation history leaks between workflow runs.
"""
writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content writer. You create new content and edit contents based on the feedback."
),
name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
instructions=(
"You are an excellent content reviewer."
"Provide actionable feedback to the writer about the provided content."
"Provide the feedback in the most concise manner possible."
),
name="reviewer_agent",
)
return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()
# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()
Samenvatting
Statusisolatie in Microsoft Agent Framework-werkstromen kan effectief worden beheerd door het verpakken van uitvoerders en het instantiëren van agents, samen met het bouwen van werkstromen binnen helpermethoden. Door de helpermethode aan te roepen telkens wanneer u een nieuwe werkstroom nodig hebt, zorgt u ervoor dat elk exemplaar een nieuwe, onafhankelijke status heeft en onbedoeld delen van statussen tussen verschillende werkstroomuitvoeringen vermijdt.