Grain 持久性

grain 可以有多个关联的命名持久数据对象。 这些状态对象在 grain 激活期间从存储中加载,因此在请求期间可供使用。 grain 持久性使用可扩展的插件模型,因此可以使用任何数据库的存储提供程序。 此持久性模型采用简便设计,不旨在涵盖所有数据访问模式。 grain 也可以直接访问数据库,而无需使用 grain 持久性模型。

在上图中,UserGrain 具有 Profile 状态和 Cart 状态,每个状态存储在单独的存储系统中。

目标

  1. 每个 grain 有多个命名持久数据对象。
  2. 配置了多个存储提供程序,每个提供程序可以采用不同的配置并由不同的存储系统提供支持。
  3. 存储提供程序可由社区开发和发布。
  4. 存储提供程序可以全面控制它们如何在持久后备存储中存储 grain 状态数据。 结论:Orleans 不提供全面的 ORM 存储解决方案,而是允许自定义存储提供程序在必要时支持特定的 ORM 要求。

可以在 NuGet 上找到 Orleans grain 存储提供程序。 官方维护的包包括:

API

grain 使用 IPersistentState<TState>(其中 TState 是可序列化状态类型)来与其持久状态交互:

public interface IPersistentState<TState> where TState : new()
{
    TState State { get; set; }
    string Etag { get; }
    Task ClearStateAsync();
    Task WriteStateAsync();
    Task ReadStateAsync();
}

IPersistentState<TState> 的实例作为构造函数参数注入到 grain 中。 可以使用 PersistentStateAttribute 属性批注这些参数,以标识要注入的状态的名称,以及提供该状态的存储提供程序的名称。 以下示例通过将两个命名状态注入 UserGrain 构造函数来演示这一点:

public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<ProfileState> _profile;
    private readonly IPersistentState<CartState> _cart;

    public UserGrain(
        [PersistentState("profile", "profileStore")] IPersistentState<ProfileState> profile,
        [PersistentState("cart", "cartStore")] IPersistentState<CartState> cart)
    {
        _profile = profile;
        _cart = cart;
    }
}

不同的 grain 类型可以使用不同的配置存储提供程序,即使两者的类型相同,也是如此;例如,连接到不同 Azure 存储帐户的两个不同 Azure 表存储提供程序实例。

读取状态

激活 grain 时,会自动读取 grain 状态,但 grain 需要负责在必要时显式触发任何已更改的 grain 状态的写入。

如果 grain 希望从后备存储中显式重新读取该 grain 的最新状态,则 grain 应调用 ReadStateAsync 方法。 这会通过存储提供程序从持久存储中重新加载 grain 状态;当 ReadStateAsync() 中的 Task 完成时,将覆盖并替换 grain 状态的先前内存中副本。

使用 State 属性访问状态值。 例如,以下方法访问上述代码中声明的配置文件状态:

public Task<string> GetNameAsync() => Task.FromResult(_profile.State.Name);

正常操作期间无需调用 ReadStateAsync();状态在激活期间会自动加载。 但是,ReadStateAsync() 可用于刷新外部修改的状态。

有关错误处理机制的详细信息,请参阅下面的故障模式部分。

写入状态

可以通过 State 属性修改状态。 修改的状态不会自动持久保存。 开发人员需要通过调用 WriteStateAsync 方法来确定何时持久保存状态。 例如,以下方法更新 State 上的属性并持久保存更新的状态:

public async Task SetNameAsync(string name)
{
    _profile.State.Name = name;
    await _profile.WriteStateAsync();
}

从概念上讲,Orleans 运行时将获取 grain 状态数据对象的深层副本,使其在执行任何写入操作期间可供使用。 在幕后,运行时可以使用优化规则和启发式方法来避免在某些情况下执行部分或全部深层复制,前提是保留预期的逻辑隔离语义。

有关错误处理机制的详细信息,请参阅下面的故障模式部分。

清除状态

ClearStateAsync 方法用于清除 grain 在存储中的状态。 根据提供程序的不同,此操作可以选择性地删除整个 grain 状态。

入门

在 grain 可以使用持久性之前,必须在 silo 上配置存储提供程序。

首先配置存储提供程序,一个用于保存配置文件状态,一个用于保存购物车状态:

var host = new HostBuilder()
    .UseOrleans(siloBuilder =>
    {
        siloBuilder.AddAzureTableGrainStorage(
            name: "profileStore",
            configureOptions: options =>
            {
                // Use JSON for serializing the state in storage
                options.UseJson = true;

                // Configure the storage connection key
                options.ConnectionString =
                    "DefaultEndpointsProtocol=https;AccountName=data1;AccountKey=SOMETHING1";
            })
            .AddAzureBlobGrainStorage(
                name: "cartStore",
                configureOptions: options =>
                {
                    // Use JSON for serializing the state in storage
                    options.UseJson = true;

                    // Configure the storage connection key
                    options.ConnectionString =
                        "DefaultEndpointsProtocol=https;AccountName=data2;AccountKey=SOMETHING2";
                });
    })
    .Build();

使用名称 "profileStore" 配置存储提供程序后,我们可以从某个 grain 访问此提供程序。

可通过两种主要方式将持久状态添加到 grain:

  1. 通过将 IPersistentState<TState> 注入 grain 的构造函数。
  2. 通过从 Grain<TGrainState> 继承。

为 grain 添加存储的建议方式是使用关联的 [PersistentState("stateName", "providerName")] 属性将 IPersistentState<TState> 注入 grain 的构造函数。 有关 Grain<TState> 的详细信息,请参阅下文。 这种方法仍受支持,但被视为遗留方法。

声明一个类来保存 grain 的状态:

[Serializable]
public class ProfileState
{
    public string Name { get; set; }

    public Date DateOfBirth
}

IPersistentState<ProfileState> 注入 grain 的构造函数:

public class UserGrain : Grain, IUserGrain
{
    private readonly IPersistentState<ProfileState> _profile;

    public UserGrain(
        [PersistentState("profile", "profileStore")]
        IPersistentState<ProfileState> profile)
    {
        _profile = profile;
    }
}

注意

配置文件状态在注入到构造函数时不会被加载,因此在注入时访问它是无效操作。 在调用 OnActivateAsync 之前加载状态。

现在 grain 有一个持久状态,接下来我们可以添加方法来读取和写入状态:

public class UserGrain : Grain, IUserGrain
    {
    private readonly IPersistentState<ProfileState> _profile;

    public UserGrain(
        [PersistentState("profile", "profileStore")]
        IPersistentState<ProfileState> profile)
    {
        _profile = profile;
    }

    public Task<string> GetNameAsync() => Task.FromResult(_profile.State.Name);

    public async Task SetNameAsync(string name)
    {
        _profile.State.Name = name;
        await _profile.WriteStateAsync();
    }
}

持久性操作 的故障模式

读取操作的故障模式

存储提供程序在初始读取特定 grain 的状态数据期间返回的故障将使该 grain 的激活操作失败;在这种情况下,不会调用该 grain 的 OnActivateAsync() 生命周期回调方法。 导致发生激活的原始 grain 请求将故障回复到调用方,就像 grain 激活期间发生任何其他故障时一样。 存储提供程序在读取特定 grain 的状态数据时遇到的故障将导致 ReadStateAsync()Task 引发异常。 grain 可以选择处理或忽略 Task 异常,就像 Orleans 中的任何其他 Task 一样。

由于缺少/错误的存储提供程序配置,任何尝试将消息发送到在 silo 启动时无法加载的 grain 的行为都将返回永久错误 BadProviderConfigException

写入操作的故障模式

存储提供程序在为特定 grain 写入状态数据时遇到的故障将导致 WriteStateAsync()Task 引发异常。 通常,这意味着如果 WriteStateAsync()Task 正确链接到此 grain 方法的最终 Task 返回值,则会向客户端调用方引发 grain 调用异常。 但是,在某些高级方案中,可以编写 grain 代码来专门处理此类写入错误,就像它们可以处理任何其他发生故障的 Task 一样。

执行错误处理/恢复代码的 grain 必须捕获异常/发生故障的 WriteStateAsync()Task,并且不重新引发这些异常,以表明它们已成功处理写入错误。

建议

使用 JSON 序列化或其他版本容错的序列化格式

代码会不断演变,这通常也包括存储类型。 为了适应这些变化,应配置适当的序列化程序。 对于大多数存储提供程序而言,可以通过 UseJson 或类似选项将 JSON 用作序列化格式。 确保在演进数据协定时,已存储的数据仍可加载。

使用 Grain<TState> 为 grain 添加存储

重要

使用 Grain<T> 为 grain 添加存储被视为遗留功能:应如前所述使用 IPersistentState<T> 添加 grain 存储。

Grain<T> 继承的 grain 类(其中 T 是需要持久保存的特定于应用程序的状态数据类型)的状态将自动从指定的存储中加载。

此类 grain 标有 StorageProviderAttribute(指定用于读取/写入此 grain 的状态数据的存储提供程序的命名实例)。

[StorageProvider(ProviderName="store1")]
public class MyGrain : Grain<MyGrainState>, /*...*/
{
  /*...*/
}

Grain<T> 基类为要调用的子类定义了以下方法:

protected virtual Task ReadStateAsync() { /*...*/ }
protected virtual Task WriteStateAsync() { /*...*/ }
protected virtual Task ClearStateAsync() { /*...*/ }

这些方法的行为对应于前面定义的 IPersistentState<TState> 上的相应方法。

创建存储提供程序

状态持久性 API 有两个部分:通过 IPersistentState<T>Grain<T> 公开给 grain 的 API,以及以 IGrainStorage 为中心的存储提供程序 API – 存储提供程序必须实现的接口:

/// <summary>
/// Interface to be implemented for a storage able to read and write Orleans grain state data.
/// </summary>
public interface IGrainStorage
{
    /// <summary>Read data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">State data object to be populated for this grain.</param>
    /// <returns>Completion promise for the Read operation on the specified grain.</returns>
    Task ReadStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);

    /// <summary>Write data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">State data object to be written for this grain.</param>
    /// <returns>Completion promise for the Write operation on the specified grain.</returns>
    Task WriteStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);

    /// <summary>Delete / Clear data function for this storage instance.</summary>
    /// <param name="grainType">Type of this grain [fully qualified class name]</param>
    /// <param name="grainReference">Grain reference object for this grain.</param>
    /// <param name="grainState">Copy of last-known state data object for this grain.</param>
    /// <returns>Completion promise for the Delete operation on the specified grain.</returns>
    Task ClearStateAsync(
        string grainType, GrainReference grainReference, IGrainState grainState);
}

通过实现此接口并注册该实现来创建自定义存储提供程序。 有关现有存储提供程序实现的示例,请参阅 AzureBlobGrainStorage

存储提供程序语义

特定于不透明提供程序的 Etag 值 (string) 可由存储提供程序设置为读取状态时填充的 grain 状态元数据的一部分。 如果某些提供程序不使用 Etag,则它们可以选择将此值保留为 null

当存储提供程序检测到 Etag 约束违规时,只要尝试执行写入操作都应导致写入 Task 发生故障并出现暂时性错误 InconsistentStateException,同时包装基础存储异常。

public class InconsistentStateException : OrleansException
{
    public InconsistentStateException(
    string message,
    string storedEtag,
    string currentEtag,
    Exception storageException)
        : base(message, storageException)
    {
        StoredEtag = storedEtag;
        CurrentEtag = currentEtag;
    }

    public InconsistentStateException(
        string storedEtag,
        string currentEtag,
        Exception storageException)
        : this(storageException.Message, storedEtag, currentEtag, storageException)
    {
    }

    /// <summary>The Etag value currently held in persistent storage.</summary>
    public string StoredEtag { get; }

    /// <summary>The Etag value currently held in memory, and attempting to be updated.</summary>
    public string CurrentEtag { get; }
}

存储操作出现的任何其他故障状况必须导致破坏返回的 Task,并出现指示潜在存储问题的异常。 在许多情况下,可以通过调用 grain 上的某个方法向调用方引发此异常,从而触发存储操作。 请务必考虑调用方是否能够反序列化此异常。 例如,客户端可能未加载包含异常类型的特定持久性库。 因此,建议将异常转换为可传播回到调用方的异常。

数据映射

各个存储提供程序应确定哪种 grain 状态存储方式 – Blob(各种格式/序列化形式)或每个字段为一列 – 是显而易见的最佳选择。

注册存储提供程序

Orleans 运行时将在创建 grain 时从服务提供程序(IServiceProvider) 解析存储提供程序。 运行时将解析 IGrainStorage 的实例。 如果存储提供程序已命名(例如通过 [PersistentState(stateName, storageName)] 属性命名),则会解析 IGrainStorage 的命名实例。

若要注册 IGrainStorage 的命名实例,请按照此处的 AzureTableGrainStorage 提供程序示例使用 AddSingletonNamedService 扩展方法。