通过


可重置执行器

概述

工作流中的执行程序通常是有状态的,例如,它们可能会累积消息、跟踪轮次计数或缓存中间结果。 在具有共享执行程序实例的多个运行中重复使用工作流时,以前的运行中的剩余状态可能会泄漏到后续运行中,从而导致意外行为或数据损坏。

IResettableExecutor 接口通过为执行器提供在运行之间清除其内部状态的协议来解决此问题。 运行完成时,工作流运行时会自动对共享执行器实例调用 ResetAsync(),以确保下一次运行的干净状态。

问题

请考虑在工作流运行期间收集消息的执行程序:

internal sealed partial class AggregationExecutor() : Executor("AggregationExecutor")
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }
}

如果此执行程序在工作流运行之间共享, _messages 则保留上一次运行中的数据。 第二次运行将看到不属于它的过时消息。

IResettableExecutor 接口

IResettableExecutor 定义工作流运行时在运行之间调用的单个方法:

public interface IResettableExecutor
{
    ValueTask ResetAsync();
}

当执行程序实现此接口时,运行时可以在每次运行后安全地重置它,从而允许在不过时状态的情况下重复使用工作流。

实现 IResettableExecutor

若要使有状态执行程序可重置,请实现接口并清除在 ResetAsync() 中的所有可变状态。

internal sealed partial class AggregationExecutor()
    : Executor("AggregationExecutor"), IResettableExecutor
{
    private readonly List<string> _messages = [];

    [MessageHandler]
    private async ValueTask HandleAsync(string message, IWorkflowContext context)
    {
        this._messages.Add(message);
        // Process aggregated messages...
    }

    public ValueTask ResetAsync()
    {
        this._messages.Clear();
        return default;
    }
}

有关使用可重置执行程序工作流的完整工作示例,请参阅 WorkflowAsAnAgent 示例

何时实现

并非所有执行程序都需要实现 IResettableExecutor。 使用此决策指南:

情景 实现? 原因
执行程序具有可变状态(列表、计数器、缓存),并在运行之间共享 是的 一次运行的状态将泄漏到下一个运行中
执行器是无状态的 无可重置
执行器是为每个工作流重新创建的(通过工厂方法) 每次运行都会获取一个处于干净状态的新实例
执行器声明为跨运行可共享(declareCrossRunShareable: true 跨运行可共享执行程序支持并发使用,而无需重置

警告

如果共享有状态执行程序未实现IResettableExecutor,重用工作流将引发InvalidOperationException

"Cannot reuse Workflow with shared Executor instances that do not implement IResettableExecutor."

运行时如何使用它

工作流运行时会自动管理重置生命周期。 你不需要自己调用 ResetAsync()。 序列为:

  1. 获得所有权 — 当工作流运行启动时,运行时获得工作流实例的所有权,并记录需要重置的执行程序。
  2. 运行执行 — 执行程序处理消息,并可能累积状态。
  3. 释放所有权 - 运行完成(或释放)时,运行时会释放所有权,并调用 ResetAsync() 实现 IResettableExecutor的所有共享执行程序实例。
  4. 准备好重复使用 — 成功重置后,工作流可用于新运行。

如果任何共享执行器无法重置(因为它未实现接口),则工作流将被标记为不可重用,且后续运行将引发错误。

与状态隔离的关系

IResettableExecutor 补充了状态管理中描述的辅助方法模式。 这两种方法满足不同的需求:

  • 帮助程序方法 (每次运行创建新的实例)提供最强大的隔离保证,并推荐作为默认方法。
  • IResettableExecutor 当需要跨运行共享执行程序实例时(例如,执行程序构造昂贵或工作流作为代理公开并在多个调用中重复使用时)时非常有用。

选择最适合你的方案的方法。 对于大多数工作流,辅助方法已足够。 共享实例时使用 IResettableExecutor 是有意的设计选择。

此概念不适用于 Python。 对于完整状态隔离,请为每个独立运行生成新的工作流和执行程序实例。 有关模式和示例,请参阅 状态管理

后续步骤