Udostępnij przez


Przepływy pracy programu Microsoft Agent Framework — stan

Ten dokument zawiera omówienie stanu w systemie przepływu pracy programu Microsoft Agent Framework.

Przegląd

Stan umożliwia wielu funkcji wykonawczych w przepływie pracy uzyskiwanie dostępu do typowych danych i modyfikowanie ich. Ta funkcja jest niezbędna w scenariuszach, w których różne części przepływu pracy muszą udostępniać informacje, w których przekazywanie komunikatów bezpośrednich nie jest możliwe lub wydajne.

Zapisywanie do stanu

using Microsoft.Agents.AI.Workflows;

internal sealed partial class FileReadExecutor(): Executor("FileReadExecutor")
{
    /// <summary>
    /// Reads a file and stores its content in a shared state.
    /// </summary>
    /// <param name="message">The path to the embedded resource file.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The ID of the shared state where the file content is stored.</returns>
    [MessageHandler]
    private async ValueTask<string> HandleAsync(string message, IWorkflowContext context)
    {
        // Read file content from embedded resource
        string fileContent = File.ReadAllText(message);
        // Store file content in a shared state for access by other executors
        string fileID = Guid.NewGuid().ToString();
        await context.QueueStateUpdateAsync<string>(fileID, fileContent, scopeName: "FileContent");

        return fileID;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class FileReadExecutor(Executor):

    @handler
    async def handle(self, file_path: str, ctx: WorkflowContext[str]):
        # Read file content from embedded resource
        with open(file_path, 'r') as file:
            file_content = file.read()
        # Store file content in state for access by other executors
        file_id = str(uuid.uuid4())
        ctx.set_state(file_id, file_content)

        await ctx.send_message(file_id)

Stan dostępu

using Microsoft.Agents.AI.Workflows;

internal sealed partial class WordCountingExecutor() : Executor("WordCountingExecutor")
{
    /// <summary>
    /// Counts the number of words in the file content stored in a shared state.
    /// </summary>
    /// <param name="message">The ID of the shared state containing the file content.</param>
    /// <param name="context">The workflow context for accessing shared states.</param>
    /// <returns>The number of words in the file content.</returns>
    [MessageHandler]
    private async ValueTask<int> HandleAsync(string message, IWorkflowContext context)
    {
        // Retrieve the file content from the shared state
        var fileContent = await context.ReadStateAsync<string>(message, scopeName: "FileContent")
            ?? throw new InvalidOperationException("File content state not found");

        return fileContent.Split([' ', '\n', '\r'], StringSplitOptions.RemoveEmptyEntries).Length;
    }
}
from agent_framework import (
    Executor,
    WorkflowContext,
    handler,
)

class WordCountingExecutor(Executor):

    @handler
    async def handle(self, file_id: str, ctx: WorkflowContext[int]):
        # Retrieve the file content from state
        file_content = ctx.get_state(file_id)
        if file_content is None:
            raise ValueError("File content state not found")

        await ctx.send_message(len(file_content.split()))

Izolacja stanu

W rzeczywistych aplikacjach prawidłowe zarządzanie stanem ma kluczowe znaczenie podczas obsługi wielu zadań lub żądań. Bez odpowiedniej izolacji, stan współdzielony między różnymi wykonaniami przepływu pracy może prowadzić do nieoczekiwanego zachowania, uszkodzenia danych i sytuacji wyścigowych. W tej sekcji wyjaśniono, jak zapewnić izolację stanu w przepływach pracy programu Microsoft Agent Framework, zapewniając wgląd w najlepsze rozwiązania i typowe pułapki.

Modyfikowalne narzędzia budowania przepływów pracy a niezmienne przepływy pracy

Przepływy pracy są tworzone przez konstruktorów przepływu pracy. Konstruktory przepływów pracy są zwykle uważane za modyfikowalne, co oznacza, że można dodawać, modyfikować wykonawcę startowego lub inne konfiguracje po utworzeniu konstruktora, a nawet po utworzeniu przepływu pracy. Z drugiej strony przepływy pracy są niezmienne w tym, że po utworzeniu przepływu pracy nie można go modyfikować (nie można modyfikować publicznego interfejsu API w celu zmodyfikowania przepływu pracy).

To rozróżnienie jest ważne, ponieważ wpływa na sposób zarządzania stanem w różnych realizacjach procesu roboczego. Nie zaleca się ponownego użycia pojedynczego wystąpienia przepływu pracy dla wielu zadań bądź próśb, ponieważ może to prowadzić do niezamierzonego dzielenia się stanem. Zamiast tego zaleca się utworzenie nowego wystąpienia przepływu pracy z kreatora dla każdego zadania lub żądania, aby zapewnić prawidłową izolację stanu i bezpieczeństwo wątków.

Zapewnianie izolacji stanu za pomocą metod pomocniczych

Gdy wystąpienia funkcji wykonawczej są tworzone raz i współużytkowane w wielu kompilacjach przepływu pracy, ich stan wewnętrzny jest współużytkowany we wszystkich wykonaniach przepływu pracy. Może to prowadzić do problemów, jeśli funkcja wykonawcza zawiera stan modyfikowalny, który powinien być izolowany dla każdego przepływu pracy. Aby zapewnić właściwą izolację stanu i bezpieczeństwo wątków, opakuj inicjalizację wykonawcy i budowanie przepływu pracy w metodę pomocniczą, aby każde wywołanie tworzyło nowe, niezależne instancje.

Wkrótce...

Przykład nieizolowany (współdzielony stan):

executor_a = CustomExecutorA()
executor_b = CustomExecutorB()

# executor_a and executor_b are shared across all workflows built from this builder
workflow_builder = WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b)

workflow_a = workflow_builder.build()
workflow_b = workflow_builder.build()
# workflow_a and workflow_b share the same executor instances and their mutable state

Przykład izolowany (metoda pomocnika):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated state.

    Each call produces independent executor instances, ensuring no state
    leaks between workflow runs.
    """
    executor_a = CustomExecutorA()
    executor_b = CustomExecutorB()

    return WorkflowBuilder(start_executor=executor_a).add_edge(executor_a, executor_b).build()

# Each workflow has its own executor instances with independent state
workflow_a = create_workflow()
workflow_b = create_workflow()

Tip

Aby zapewnić właściwą izolację stanu i bezpieczne działanie wątków, upewnij się również, że egzekutory utworzone wewnątrz metody pomocniczej nie współdzielą zewnętrznego stanu modyfikowalnego.

Zarządzanie stanem agenta

Kontekst agenta jest zarządzany za pośrednictwem wątków agenta. Domyślnie każdy agent w przepływie pracy otrzyma własny wątek, chyba że agent jest zarządzany przez niestandardową funkcję wykonawcza. Aby uzyskać więcej informacji, zobacz Praca z agentami.

Wątki agenta są utrwalane w ramach przebiegów przepływu pracy. Oznacza to, że jeśli agent jest wywoływany w pierwszym uruchomieniu przepływu pracy, zawartość wygenerowana przez agenta będzie dostępna w kolejnych uruchomieniach tego samego wystąpienia przepływu pracy. Chociaż może to być przydatne do utrzymania ciągłości w ramach jednego zadania, może również prowadzić do niezamierzonego udostępniania stanu, jeśli to samo wystąpienie przepływu pracy jest ponownie używane dla różnych zadań lub żądań. Aby upewnić się, że każde zadanie ma stan izolowanego agenta, opakuj agenta i tworzenie przepływu pracy wewnątrz metody pomocniczej, aby każde wywołanie tworzyło nowe wystąpienia agenta z własnymi wątkami.

Wkrótce...

Przykład nieizolowany (stan udostępnionego agenta):

writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content writer. You create new content and edit contents based on the feedback."
    ),
    name="writer_agent",
)
reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
    instructions=(
        "You are an excellent content reviewer."
        "Provide actionable feedback to the writer about the provided content."
        "Provide the feedback in the most concise manner possible."
    ),
    name="reviewer_agent",
)

# writer_agent and reviewer_agent are shared across all workflows
workflow = WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

Przykład izolowany (metoda pomocnika):

def create_workflow() -> Workflow:
    """Create a fresh workflow with isolated agent state.

    Each call produces new agent instances with their own threads,
    ensuring no conversation history leaks between workflow runs.
    """
    writer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content writer. You create new content and edit contents based on the feedback."
        ),
        name="writer_agent",
    )
    reviewer_agent = AzureOpenAIChatClient(credential=AzureCliCredential()).as_agent(
        instructions=(
            "You are an excellent content reviewer."
            "Provide actionable feedback to the writer about the provided content."
            "Provide the feedback in the most concise manner possible."
        ),
        name="reviewer_agent",
    )

    return WorkflowBuilder(start_executor=writer_agent).add_edge(writer_agent, reviewer_agent).build()

# Each workflow has its own agent instances and threads
workflow_a = create_workflow()
workflow_b = create_workflow()

Podsumowanie

Izolacja stanu w przepływach pracy programu Microsoft Agent Framework może być skutecznie zarządzana przez encapsulację inicializacji wykonawcy i instancji agenta wraz z budowaniem przepływu pracy w ramach metod pomocniczych. Wywołując metodę pomocnika za każdym razem, gdy potrzebujesz nowego przepływu pracy, masz pewność, że każde wystąpienie ma nowy, niezależny stan, co pozwala uniknąć niezamierzonego udostępniania stanu między różnymi wykonaniami przepływu pracy.

Dalsze kroki